package org.gridgain.grid.internal.processors.cache.dr.ist;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.tree.updatelog.PartitionLogTree;
import org.apache.ignite.internal.processors.cache.tree.updatelog.UpdateLogRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.gridgain.grid.cache.dr.CacheDrEntryFilter;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrMetrics;
import org.gridgain.grid.internal.processors.cache.dr.EntryBuffer;
import org.gridgain.grid.internal.processors.cache.dr.SerializedDrEntry;
import org.gridgain.grid.internal.processors.dr.DrUtils;

/* loaded from: input_file:org/gridgain/grid/internal/processors/cache/dr/ist/CacheIncrementalDrHandler.class */
public class CacheIncrementalDrHandler implements DrStateAware {
    private final GridCacheContext cctx;
    private final Consumer<DrPartitionAwareJob> executor;
    private final IgniteLogger log;
    private final CacheSenderHubManager sndHubMgr;
    private final CachePartitionStateManager partStateMgr;
    private final Supplier<CacheDrMetrics> metrics;
    private final GridBusyLock busyLock;
    private final DrEntryFilterWrapper drFilter;
    private final int batchSendSizeBytes;
    private final Map<Integer, PartitionDrHandler> handlers = new ConcurrentHashMap();
    private final DrStateHolder drState;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/gridgain/grid/internal/processors/cache/dr/ist/CacheIncrementalDrHandler$IncrementalStateTransferJob.class */
    public class IncrementalStateTransferJob implements DrPartitionAwareJob {
        private final int part;
        private final PartitionDrHandler istHnd;
        private long startCntr;
        private volatile boolean cancelled;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/gridgain/grid/internal/processors/cache/dr/ist/CacheIncrementalDrHandler$IncrementalStateTransferJob$BatchImpl.class */
        public class BatchImpl implements DrBatchEventListener {
            private final int batchSendSizeBytes;
            private Collection<SerializedDrEntry> entries = new ConcurrentLinkedQueue();
            private int entriesCnt;
            private int batchSize;
            private final int part;
            private final long startCntr;
            private long endCntr;

            BatchImpl(int i, long j, int i2) {
                this.part = i;
                this.startCntr = j;
                this.batchSendSizeBytes = i2;
            }

            <K, V> void add(GridCacheRawVersionedEntry<K, V> gridCacheRawVersionedEntry) {
                SerializedDrEntry serializeEntry = CacheIncrementalDrHandler.this.serializeEntry(gridCacheRawVersionedEntry);
                this.entries.add(serializeEntry);
                this.entriesCnt++;
                this.batchSize += serializeEntry.size();
            }

            boolean readyToSend() {
                return this.batchSize > this.batchSendSizeBytes;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public Map<Byte, EntryBuffer> buffers() {
                HashMap hashMap = new HashMap();
                EntryBuffer entryBuffer = null;
                for (SerializedDrEntry serializedDrEntry : this.entries) {
                    byte dcID = serializedDrEntry.dcID();
                    if (dcID != -1) {
                        entryBuffer = (EntryBuffer) hashMap.computeIfAbsent(Byte.valueOf(dcID), b -> {
                            return new EntryBuffer(CacheIncrementalDrHandler.this.cctx);
                        });
                    }
                    entryBuffer.writeEntry(serializedDrEntry);
                }
                this.entries = null;
                return hashMap;
            }

            boolean isEmpty() {
                return this.entries.isEmpty();
            }

            public void endCntr(long j) {
                this.endCntr = j;
            }

            @Override // org.gridgain.grid.internal.processors.cache.dr.ist.DrBatchEventListener
            public void onAcked() {
                IncrementalStateTransferJob.this.onBatchAck(this.startCntr, this.endCntr);
            }

            @Override // org.gridgain.grid.internal.processors.cache.dr.ist.DrBatchEventListener
            public void onRejected(Throwable th) {
                IncrementalStateTransferJob.this.onBatchReject();
            }

            @Override // org.gridgain.grid.internal.processors.cache.dr.ist.DrBatchEventListener
            public void onSent() {
                IncrementalStateTransferJob.this.onBatchSent(this.startCntr, this.endCntr);
            }

            public String toString() {
                return "Batch[entriesCnt=" + this.entriesCnt + ", batchSize=" + this.batchSize + ", part=" + this.part + ", startCntr=" + this.startCntr + ", endCntr=" + this.endCntr + ']';
            }
        }

        IncrementalStateTransferJob(PartitionDrHandler partitionDrHandler, long j) {
            this.istHnd = partitionDrHandler;
            this.part = partitionDrHandler.part();
            this.startCntr = j;
        }

        @Override // org.gridgain.grid.internal.processors.cache.dr.Cancellable
        public boolean isCancelled() {
            return this.cancelled;
        }

        @Override // org.gridgain.grid.internal.processors.cache.dr.Cancellable
        public void cancel() {
            this.cancelled = true;
        }

        @Override // org.gridgain.grid.internal.processors.cache.dr.ist.DrPartitionAwareJob
        public int part() {
            return this.part;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                continueReplication();
            } catch (Exception e) {
                if (!this.cancelled) {
                    U.error(CacheIncrementalDrHandler.this.log, "Failed to start/resume incremental state transfer: cache=" + CacheIncrementalDrHandler.this.cctx.name(), e);
                } else if (CacheIncrementalDrHandler.this.log.isTraceEnabled()) {
                    CacheIncrementalDrHandler.this.log.trace("State transfer task has been stopped for cache: cache=" + CacheIncrementalDrHandler.this.cctx.name() + ", cause=" + e.getMessage());
                }
            }
        }

        private void continueReplication() {
            try {
                if (CacheIncrementalDrHandler.this.busyLock.enterBusy()) {
                    try {
                        GridDhtLocalPartition localPartition = CacheIncrementalDrHandler.this.cctx.topology().localPartition(this.part, AffinityTopologyVersion.NONE, false);
                        if (localPartition == null || !localPartition.reserve()) {
                            return;
                        }
                        try {
                            if (localPartition.state() != GridDhtPartitionState.OWNING || !CacheIncrementalDrHandler.this.cctx.affinity().primaryByPartition(CacheIncrementalDrHandler.this.cctx.localNode(), this.part, AffinityTopologyVersion.NONE)) {
                                CacheIncrementalDrHandler.this.busyLock.leaveBusy();
                                return;
                            }
                            long j = this.startCntr;
                            long lastUpdCntr = CacheIncrementalDrHandler.this.partStateMgr.lastUpdCntr(this.part);
                            if (j >= lastUpdCntr) {
                                localPartition.release();
                                this.istHnd.onTransferJobFinished(this);
                                CacheIncrementalDrHandler.this.busyLock.leaveBusy();
                                return;
                            }
                            PartitionLogTree logTree = CacheIncrementalDrHandler.this.cctx.offheap().dataStore(localPartition).logTree();
                            if (!$assertionsDisabled && logTree == null) {
                                throw new AssertionError();
                            }
                            BatchImpl batchImpl = null;
                            boolean z = false;
                            long j2 = j;
                            while (j < lastUpdCntr) {
                                GridCursor find = logTree.find(new UpdateLogRow(CacheIncrementalDrHandler.this.cctx.cacheId(), j + 1), new UpdateLogRow(CacheIncrementalDrHandler.this.cctx.cacheId(), lastUpdCntr), PartitionLogTree.FULL_ROW);
                                while (!isCancelled() && find.next()) {
                                    UpdateLogRow updateLogRow = (UpdateLogRow) find.get();
                                    if (!$assertionsDisabled && updateLogRow.updateCounter() <= j2) {
                                        throw new AssertionError();
                                    }
                                    if (batchImpl == null) {
                                        batchImpl = new BatchImpl(this.part, j2, CacheIncrementalDrHandler.this.batchSendSizeBytes);
                                    }
                                    addRowToBatch(batchImpl, updateLogRow);
                                    j2 = updateLogRow.updateCounter();
                                    if (batchImpl.readyToSend()) {
                                        batchImpl.endCntr(j2);
                                        send(batchImpl);
                                        z = true;
                                        batchImpl = null;
                                    }
                                }
                                if (isCancelled()) {
                                    localPartition.release();
                                    this.istHnd.onTransferJobFinished(this);
                                    CacheIncrementalDrHandler.this.busyLock.leaveBusy();
                                    return;
                                }
                                j = lastUpdCntr;
                                lastUpdCntr = CacheIncrementalDrHandler.this.partStateMgr.lastUpdCntr(this.part);
                            }
                            if (batchImpl != null && !batchImpl.isEmpty()) {
                                batchImpl.endCntr(lastUpdCntr);
                                send(batchImpl);
                                z = true;
                            }
                            if (!z) {
                                this.istHnd.onBatchSent(this, this.startCntr, lastUpdCntr);
                                this.istHnd.onBatchAcknowledged(this.startCntr, lastUpdCntr);
                            }
                            this.startCntr = lastUpdCntr;
                            localPartition.release();
                            this.istHnd.onTransferJobFinished(this);
                            CacheIncrementalDrHandler.this.busyLock.leaveBusy();
                        } finally {
                            localPartition.release();
                            this.istHnd.onTransferJobFinished(this);
                        }
                    } catch (Exception e) {
                        CacheIncrementalDrHandler.this.log.error("Failed to read cache for DR purposes: cache=" + CacheIncrementalDrHandler.this.cctx.name(), e);
                        throw new IgniteException(e);
                    }
                }
            } finally {
                CacheIncrementalDrHandler.this.busyLock.leaveBusy();
            }
        }

        public void addRowToBatch(BatchImpl batchImpl, UpdateLogRow updateLogRow) throws IgniteCheckedException {
            GridCacheRawVersionedEntry gridCacheRawVersionedEntry = new GridCacheRawVersionedEntry(updateLogRow.key(), updateLogRow.value().cacheObjectType() == -1 ? null : updateLogRow.value(), 0L, updateLogRow.expireTime(), updateLogRow.version());
            if (CacheIncrementalDrHandler.this.drFilter == null || CacheIncrementalDrHandler.this.drFilter.accept(gridCacheRawVersionedEntry)) {
                batchImpl.add(gridCacheRawVersionedEntry);
            } else {
                ((CacheDrMetrics) CacheIncrementalDrHandler.this.metrics.get()).onSenderCacheEntryFiltered();
            }
        }

        private void send(BatchImpl batchImpl) {
            if (CacheIncrementalDrHandler.this.log.isTraceEnabled()) {
                CacheIncrementalDrHandler.this.log.trace("Sending batch: " + batchImpl);
            }
            CacheIncrementalDrHandler.this.sndHubMgr.send(batchImpl.buffers(), batchImpl);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onBatchReject() {
            if (isCancelled()) {
                return;
            }
            this.istHnd.onBatchRejected(this);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onBatchSent(long j, long j2) {
            if (isCancelled()) {
                return;
            }
            this.istHnd.onBatchSent(this, j, j2);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onBatchAck(long j, long j2) {
            if (isCancelled()) {
                return;
            }
            this.istHnd.onBatchAcknowledged(j, j2);
        }

        static {
            $assertionsDisabled = !CacheIncrementalDrHandler.class.desiredAssertionStatus();
        }
    }

    public CacheIncrementalDrHandler(GridCacheContext gridCacheContext, IgniteLogger igniteLogger, CachePartitionStateManager cachePartitionStateManager, CacheSenderHubManager cacheSenderHubManager, DrStateHolder drStateHolder, GridBusyLock gridBusyLock, CacheDrEntryFilter cacheDrEntryFilter, int i, Consumer<DrPartitionAwareJob> consumer, Supplier<CacheDrMetrics> supplier) {
        this.cctx = gridCacheContext;
        this.log = igniteLogger;
        this.partStateMgr = cachePartitionStateManager;
        this.sndHubMgr = cacheSenderHubManager;
        this.drState = drStateHolder;
        this.busyLock = gridBusyLock;
        this.batchSendSizeBytes = i;
        this.executor = consumer;
        this.metrics = supplier;
        this.drFilter = cacheDrEntryFilter == null ? null : new DrEntryFilterWrapper(gridCacheContext.cacheObjectContext(), cacheDrEntryFilter);
    }

    public void stop() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Stop incremental DR handler.");
        }
        this.handlers.values().forEach(partitionDrHandler -> {
            partitionDrHandler.cancel();
        });
    }

    @Override // org.gridgain.grid.internal.processors.cache.dr.ist.DrStateAware
    public void onPause() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Pause incremental DR.");
        }
        this.handlers.values().forEach(partitionDrHandler -> {
            partitionDrHandler.pauseTransfer();
        });
    }

    @Override // org.gridgain.grid.internal.processors.cache.dr.ist.DrStateAware
    public void onResume() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Resume incremental DR.");
        }
        this.handlers.values().forEach(partitionDrHandler -> {
            partitionDrHandler.resumeTransfer();
        });
    }

    <K, V> SerializedDrEntry serializeEntry(GridCacheRawVersionedEntry<K, V> gridCacheRawVersionedEntry) {
        try {
            gridCacheRawVersionedEntry.marshal(this.cctx.cacheObjectContext(), this.cctx.gridConfig().getMarshaller());
            GridByteArrayOutputStream gridByteArrayOutputStream = new GridByteArrayOutputStream(DrUtils.drEntrySize(gridCacheRawVersionedEntry));
            DataOutputStream dataOutputStream = new DataOutputStream(gridByteArrayOutputStream);
            Throwable th = null;
            try {
                try {
                    DrUtils.writeDrEntry(dataOutputStream, gridCacheRawVersionedEntry);
                    if (dataOutputStream != null) {
                        if (0 != 0) {
                            try {
                                dataOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            dataOutputStream.close();
                        }
                    }
                    if ($assertionsDisabled || DrUtils.drEntrySize(gridCacheRawVersionedEntry) == gridByteArrayOutputStream.internalArray().length) {
                        return new SerializedDrEntry(gridCacheRawVersionedEntry.version().dataCenterId(), gridByteArrayOutputStream.internalArray());
                    }
                    throw new AssertionError();
                } finally {
                }
            } finally {
            }
        } catch (IgniteCheckedException | IOException e) {
            throw new IgniteException("Failed to marshal data for replication.", e);
        }
    }

    public void onPartitionCounterChanged(int i) {
        PartitionDrHandler partitionDrHandler = this.handlers.get(Integer.valueOf(i));
        if (partitionDrHandler != null) {
            partitionDrHandler.onUpdateCounterChanged();
        }
    }

    public void onPartitionAssignment(Set<Integer> set, Set<Integer> set2) {
        this.cctx.topology().localPartitions().forEach(gridDhtLocalPartition -> {
            int id = gridDhtLocalPartition.id();
            if (!set.contains(Integer.valueOf(id))) {
                PartitionDrHandler remove = this.handlers.remove(Integer.valueOf(id));
                if (remove != null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Cancel partition replication, owner changed: cache=" + this.cctx.name() + ", part=" + remove.part());
                    }
                    remove.cancel();
                }
                if (set2.contains(Integer.valueOf(id))) {
                    this.partStateMgr.cleanupAsync(id);
                    return;
                }
                return;
            }
            if (!$assertionsDisabled && gridDhtLocalPartition.state() != GridDhtPartitionState.OWNING) {
                throw new AssertionError(gridDhtLocalPartition.state());
            }
            if (this.log.isDebugEnabled() && !this.handlers.containsKey(Integer.valueOf(id))) {
                this.log.info("Start partition replication: cache=" + this.cctx.name() + ", part=" + id);
            }
            PartitionDrHandler computeIfAbsent = this.handlers.computeIfAbsent(Integer.valueOf(id), num -> {
                return new PartitionDrHandler(this.cctx, num, this, this.partStateMgr);
            });
            this.partStateMgr.moveLastPartCntr(id, gridDhtLocalPartition.updateCounter());
            computeIfAbsent.continueTransfer();
            this.partStateMgr.cleanupAsync(id);
        });
    }

    public boolean isActive() {
        return this.drState.isActive();
    }

    public DrPartitionAwareJob createJob(PartitionDrHandler partitionDrHandler, long j) {
        return new IncrementalStateTransferJob(partitionDrHandler, j);
    }

    public void submit(DrPartitionAwareJob drPartitionAwareJob) {
        this.executor.accept(drPartitionAwareJob);
    }

    static {
        $assertionsDisabled = !CacheIncrementalDrHandler.class.desiredAssertionStatus();
    }
}
