/*
 * Decompiled with CFR 0.152.
 */
package org.gridgain.grid.internal.processors.cache.dr;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheType;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.io.GridByteArrayOutputStream;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrResultType;
import org.gridgain.grid.internal.processors.cache.dr.EntryBuffer;
import org.gridgain.grid.internal.processors.cache.dr.GridGainCacheDrManager;
import org.gridgain.grid.internal.processors.cache.dr.Permit;
import org.gridgain.grid.internal.processors.cache.dr.SerializedDrEntry;
import org.gridgain.grid.internal.processors.dr.DrProcessor;
import org.gridgain.grid.internal.processors.dr.DrSenderAttributes;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequestEntry;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalResponse;
import org.jetbrains.annotations.Nullable;

public class CacheDrHandler {
    static final AtomicLong ID_GEN = new AtomicLong();
    static final int INITIAL_DELAY = 100;
    static final int MAX_DELAY = 10000;
    static final int FORCE_REQUEST_THRESHOLD = 500;
    @GridToStringExclude
    private final GridCacheContext cctx;
    private final GridGainCacheDrManager mgr;
    @GridToStringExclude
    private final CacheDrSenderConfiguration ccfg;
    private final Object topic;
    @GridToStringExclude
    private final IgniteLogger log;
    private final ConcurrentMap<Long, BatchRequest> reqs = new ConcurrentHashMap<Long, BatchRequest>();
    private final BatchManager batchMgr;
    private GridBoundedConcurrentLinkedHashSet<GridCacheRawVersionedEntry> backup;
    private GridTimeoutProcessor.CancelableTask batchTimeoutTask;
    private final GridSpinBusyLock busyLock;
    private final boolean atomic;
    private final int batchSendSize;
    private final int batchSendSizeBytes;
    private volatile boolean disableThrottling = IgniteSystemProperties.getBoolean("IGNITE_DISABLE_SMART_DR_THROTTLING", false);

    public CacheDrHandler(GridCacheContext cctx, GridGainCacheDrManager mgr, DrProcessor drProc, GridSpinBusyLock busyLock, CacheDrSenderConfiguration ccfg) {
        assert (cctx != null);
        assert (mgr != null);
        assert (ccfg != null);
        this.cctx = cctx;
        this.mgr = mgr;
        this.ccfg = ccfg;
        this.busyLock = busyLock;
        this.log = cctx.logger(CacheDrHandler.class);
        this.atomic = cctx.atomic();
        this.topic = CU.replicationTopicReceive(cctx.name());
        this.batchMgr = new BatchManager(ccfg.getMaxBatches());
        this.batchSendSize = ccfg.getBatchSendSize();
        this.batchSendSizeBytes = drProc.ggConfig().getBatchSendSizeBytes();
        if (cctx.isReplicated() || cctx.config().getBackups() > 0) {
            int backupSize = ccfg.getMaxBackupQueueSize();
            if (backupSize <= 0) {
                int backups = cctx.isReplicated() ? 2 : cctx.config().getBackups();
                int maxBatches = ccfg.getMaxBatches();
                if (maxBatches <= 0) {
                    maxBatches = 32;
                }
                backupSize = maxBatches * this.batchSendSize * backups * 3 / 2;
            }
            this.backup = new GridBoundedConcurrentLinkedHashSet(backupSize, 64, 0.75f, Runtime.getRuntime().availableProcessors());
        } else {
            this.backup = null;
        }
        String cacheName = cctx.name();
        assert (CacheType.cacheType(cacheName) == CacheType.USER) : "replication of inner system caches is deprecated, cacheName=" + cacheName;
    }

    void setDisableThrottling(boolean disableThrottling) {
        this.disableThrottling = disableThrottling;
    }

    void onStart() {
        long batchSndPeriod = this.ccfg.getBatchSendFrequency();
        if (batchSndPeriod > 0L) {
            this.batchTimeoutTask = this.cctx.kernalContext().timeout().schedule(this::sendBatchOnTimeout, batchSndPeriod, batchSndPeriod);
        }
        this.cctx.gridIO().addMessageListener(this.topic, new GridMessageListener(){

            @Override
            public void onMessage(UUID nodeId, Object msg, byte plc) {
                if (msg instanceof DrInternalResponse) {
                    Throwable err;
                    if (CacheDrHandler.this.log.isDebugEnabled()) {
                        CacheDrHandler.this.log.debug("Received internal replication response message [sourceNodeId=" + nodeId + ", msg=" + msg + ']');
                    }
                    DrInternalResponse msg0 = (DrInternalResponse)msg;
                    try {
                        msg0.finishUnmarshal(CacheDrHandler.this.cctx.marshaller(), U.resolveClassLoader(CacheDrHandler.this.cctx.gridConfig()));
                        err = msg0.error();
                    }
                    catch (IgniteCheckedException e) {
                        U.error(CacheDrHandler.this.log, "Failed to unmarshal message (will ignore): " + msg0, e);
                        return;
                    }
                    CacheDrHandler.this.onReplicationResponse(msg0.id(), msg0.code(), nodeId, err);
                } else assert (false) : "Unexpected message type: " + msg;
            }
        });
    }

    void onKernalStop() {
        if (this.batchTimeoutTask != null) {
            this.batchTimeoutTask.close();
        }
        this.batchMgr.cancel();
        this.backup = null;
        this.cctx.kernalContext().io().removeMessageListener(this.topic);
        this.reqs.forEach((k, f) -> ((BatchRequest)f).onFinish(CacheDrResultType.IGNORED));
        this.reqs.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onReplicate(GridCacheRawVersionedEntry entry, GridDrType drType, AffinityTopologyVersion topVer) {
        block24: {
            assert (entry != null);
            assert (drType != null);
            if (!this.enterBusy()) {
                return;
            }
            try {
                if (this.mgr.stopped()) {
                    if (this.log.isDebugEnabled()) {
                        LT.info(this.log, "Skipped replication because either no sender hubs are available or the grid is stopping.");
                    }
                    return;
                }
                DrSenderAttributes sndHubAttrs0 = this.mgr.sendHubAttributes();
                if (sndHubAttrs0 != null) {
                    if (sndHubAttrs0.getIgnoreList().contains(entry.version().dataCenterId())) break block24;
                    switch (drType) {
                        case DR_PRIMARY: 
                        case DR_LOAD: {
                            this.replicateShared(entry);
                            break block24;
                        }
                        case DR_BACKUP: {
                            List<ClusterNode> affNodes = this.cctx.affinity().nodesByKey(entry.getKey(), topVer);
                            if (this.atomic) {
                                if (F.eq(F.first(affNodes), this.cctx.localNode())) {
                                    this.replicateShared(entry);
                                } else if (this.backup != null && this.addBackup(affNodes)) {
                                    this.backup.add(entry);
                                }
                            } else if (this.backup != null && this.addBackup(affNodes)) {
                                this.backup.add(entry);
                            }
                            break block24;
                        }
                        case DR_PRELOAD: {
                            ClusterNode locNode;
                            List<ClusterNode> nodes;
                            if (this.backup != null && (nodes = this.cctx.affinity().nodesByKey(entry.getKey(), topVer)).contains(locNode = this.cctx.localNode()) && !F.eq(F.first(nodes), locNode) && this.addBackup(nodes)) {
                                this.backup.add(entry);
                            }
                            break block24;
                        }
                        default: {
                            assert (false);
                            break block24;
                        }
                    }
                }
                if (this.log.isDebugEnabled()) {
                    LT.info(this.log, "Skipped replication because either no sender hubs are available or the grid is stopping.");
                }
            }
            finally {
                this.busyLock.leaveBusy();
            }
        }
    }

    private boolean addBackup(Collection<ClusterNode> affNodes) {
        if (!this.cctx.isReplicated() || affNodes.size() < 3) {
            return true;
        }
        Iterator<ClusterNode> iter = affNodes.iterator();
        iter.next();
        return iter.next().equals(this.cctx.localNode()) || iter.next().equals(this.cctx.localNode());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onReplicationResponse(long id, byte code, UUID nodeId, @Nullable Throwable err) {
        if (!this.busyLock.enterBusy()) {
            return;
        }
        try {
            BatchRequest req = (BatchRequest)this.reqs.get(id);
            if (req != null) {
                req.onResponse(nodeId, code, err);
            }
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onExchange(AffinityTopologyVersion topVer, boolean left, boolean skipSnd) throws IgniteCheckedException {
        if (!this.enterBusy()) {
            return;
        }
        try {
            Batch batch;
            if (!skipSnd && (batch = this.batchMgr.tryGetCurrentBatch()) != null && batch.denyAdditions()) {
                this.sendBatch(batch);
            }
            if (!skipSnd && left && this.backup != null) {
                this.sendBackups(topVer);
            }
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onPartitionEvicted(int part) {
        if (!this.enterBusy()) {
            return;
        }
        try {
            if (this.backup != null && !this.mgr.stopped()) {
                Iterator iter = this.backup.iterator();
                while (iter.hasNext()) {
                    GridCacheRawVersionedEntry entry = (GridCacheRawVersionedEntry)iter.next();
                    if (this.cctx.affinity().partition(entry.getKey()) != part) continue;
                    iter.remove();
                }
            }
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    IgniteInternalFuture<CacheDrResultType> fullStateTransferReplicate(Collection<Byte> dataCenterIds, Map<Byte, EntryBuffer> entryBuffers, IgniteThrowableFunction<Integer, Permit> permitFunction, boolean syncFst, @Nullable IgniteUuid fstId) throws IgniteCheckedException {
        if (!this.enterBusy()) {
            return new GridFinishedFuture<CacheDrResultType>(CacheDrResultType.IGNORED);
        }
        try {
            DrInternalRequest req = this.createInternalRequest(entryBuffers, dataCenterIds, fstId);
            BatchRequest batch = new BatchRequest(req, permitFunction.apply(DrUtils.size(req)), syncFst);
            batch.send();
            IgniteInternalFuture<CacheDrResultType> igniteInternalFuture = batch.future();
            return igniteInternalFuture;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private void replicateShared(GridCacheRawVersionedEntry entry) {
        while (true) {
            Batch batch;
            if ((batch = this.batchMgr.getCurrentBatch()) != null) {
                if (batch.add(entry)) {
                    if (batch.readyToSend() && batch.denyAdditions()) {
                        this.sendBatch(batch);
                    }
                    return;
                }
                this.batchMgr.discardBatch(batch);
                continue;
            }
            if (this.mgr.stopped()) break;
        }
    }

    private void sendBatch(Batch batch) {
        try {
            batch.createRequest();
        }
        catch (Throwable t2) {
            batch.release();
            throw U.convertException(U.cast(t2));
        }
    }

    private void sendBackups(AffinityTopologyVersion topVer) {
        assert (this.backup != null);
        if (!this.mgr.stopped()) {
            Batch batch = null;
            Iterator iter = this.backup.iterator();
            while (!this.mgr.stopped() && iter.hasNext()) {
                GridCacheRawVersionedEntry entry = (GridCacheRawVersionedEntry)iter.next();
                if (!this.cctx.affinity().primaryByKey(this.cctx.localNode(), entry.getKey(), topVer)) continue;
                iter.remove();
                if (batch == null) {
                    batch = new Batch(false);
                }
                batch.add(entry);
                if (!batch.readyToSend()) continue;
                batch.denyAdditions();
                this.sendBatch(batch);
                batch = null;
            }
            if (!this.mgr.stopped() && batch != null) {
                batch.denyAdditions();
                this.sendBatch(batch);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    int queuedKeysCount() {
        if (!this.enterBusy()) {
            return 0;
        }
        try {
            int cnt = 0;
            for (BatchRequest req : this.reqs.values()) {
                cnt += req.entriesCount();
            }
            int n = cnt;
            return n;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    int backupQueueSize() {
        if (!this.enterBusy()) {
            return 0;
        }
        try {
            int n = this.backup == null ? 0 : this.backup.sizex();
            return n;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    int batchWaitingSendCount() {
        if (!this.enterBusy()) {
            return 0;
        }
        try {
            int n = this.batchMgr.batchesAcquired();
            return n;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    int batchWaitingAcknowledgeCount() {
        if (!this.enterBusy()) {
            return 0;
        }
        try {
            int n = this.reqs.size();
            return n;
        }
        finally {
            this.busyLock.leaveBusy();
        }
    }

    private boolean enterBusy() {
        if (!this.busyLock.enterBusy()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to perform action on replication handler because it is stopping.");
            }
            return false;
        }
        return true;
    }

    void onSenderHubsLeave(final Collection<UUID> sndHubIds) {
        if (!this.reqs.isEmpty()) {
            this.cctx.closures().runLocalSafe(new GridPlainRunnable(){

                @Override
                public void run() {
                    if (!CacheDrHandler.this.busyLock.enterBusy()) {
                        return;
                    }
                    try {
                        for (BatchRequest req : CacheDrHandler.this.reqs.values()) {
                            req.onHubsLeave(sndHubIds);
                        }
                    }
                    catch (Error | RuntimeException e) {
                        U.error(CacheDrHandler.this.log, "Unexpected runtime exception.", e);
                        throw e;
                    }
                    finally {
                        CacheDrHandler.this.busyLock.leaveBusy();
                    }
                }
            });
        }
    }

    public String toString() {
        return S.toString(CacheDrHandler.class, this);
    }

    BatchRequest createBatchRequest(Collection<SerializedDrEntry> entries, @Nullable Permit permit) {
        HashMap<Byte, EntryBuffer> map = new HashMap<Byte, EntryBuffer>();
        byte curDC = -1;
        EntryBuffer buffer = null;
        for (SerializedDrEntry e : entries) {
            byte dcId = e.dcID();
            if (dcId != curDC) {
                buffer = map.computeIfAbsent(dcId, k -> new EntryBuffer(this.cctx));
            }
            buffer.writeEntry(e);
        }
        return new BatchRequest(this.createInternalRequest(map, null, null), permit, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private DrInternalRequest createInternalRequest(Map<Byte, EntryBuffer> entryBuffers, @Nullable Collection<Byte> dataCenterIds, @Nullable IgniteUuid fstId) {
        ArrayList<DrInternalRequestEntry> reqEntries = new ArrayList<DrInternalRequestEntry>(entryBuffers.size());
        int entriesCnt = 0;
        try {
            for (Map.Entry<Byte, EntryBuffer> e : entryBuffers.entrySet()) {
                EntryBuffer buf = e.getValue();
                buf.flush();
                reqEntries.add(new DrInternalRequestEntry(e.getKey(), buf.entriesCnt(), buf.getBytes(), buf.sizeBytes()));
                entriesCnt += buf.entriesCnt();
            }
        }
        catch (Throwable throwable) {
            for (EntryBuffer buf : entryBuffers.values()) {
                U.closeQuiet(buf);
            }
            throw throwable;
        }
        for (EntryBuffer buf : entryBuffers.values()) {
            U.closeQuiet(buf);
        }
        return new DrInternalRequest(ID_GEN.incrementAndGet(), this.cctx.name(), dataCenterIds, reqEntries, entriesCnt, fstId);
    }

    private void sendBatchOnTimeout() {
        assert (this.ccfg.getBatchSendFrequency() > 0L);
        Batch batch = this.batchMgr.tryGetCurrentBatch();
        if (batch != null && batch.readyToSend() && batch.denyAdditions()) {
            this.sendBatch(batch);
        }
    }

    private class Batch
    implements Permit {
        private final long created = U.currentTimeMillis();
        private Collection<SerializedDrEntry> entries = new ConcurrentLinkedQueue<SerializedDrEntry>();
        private final AtomicInteger entriesCnt = new AtomicInteger();
        private final AtomicInteger batchSize = new AtomicInteger();
        private final AtomicBoolean reserved;
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        private boolean noAdd;

        public Batch(boolean reserved) {
            this.reserved = new AtomicBoolean(reserved);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        boolean add(GridCacheRawVersionedEntry entry) {
            this.lock.readLock().lock();
            try {
                if (this.noAdd) {
                    boolean bl = false;
                    return bl;
                }
                SerializedDrEntry e = this.serializeEntry(entry);
                this.entries.add(e);
                this.entriesCnt.incrementAndGet();
                this.batchSize.addAndGet(e.size());
                boolean bl = true;
                return bl;
            }
            finally {
                this.lock.readLock().unlock();
            }
        }

        SerializedDrEntry serializeEntry(GridCacheRawVersionedEntry entry) {
            try {
                entry.marshal(CacheDrHandler.this.cctx.cacheObjectContext(), CacheDrHandler.this.cctx.gridConfig().getMarshaller());
                GridByteArrayOutputStream out = new GridByteArrayOutputStream(DrUtils.drEntrySize(entry));
                try (DataOutputStream dataOutput = new DataOutputStream(out);){
                    DrUtils.writeDrEntry(dataOutput, entry);
                }
                assert (DrUtils.drEntrySize(entry) == out.internalArray().length);
                return new SerializedDrEntry(entry.version().dataCenterId(), out.internalArray());
            }
            catch (IOException | IgniteCheckedException e) {
                throw new IgniteException("Failed to marshal data for replication.", e);
            }
        }

        boolean denyAdditions() {
            this.lock.writeLock().lock();
            try {
                if (this.noAdd) {
                    boolean bl = false;
                    return bl;
                }
                this.noAdd = true;
                boolean bl = true;
                return bl;
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }

        @Override
        public boolean release() {
            if (this.reserved.compareAndSet(true, false)) {
                CacheDrHandler.this.batchMgr.release();
                return true;
            }
            return false;
        }

        boolean readyToSend() {
            int entriesCnt0 = this.entriesCnt.get();
            return CacheDrHandler.this.batchSendSize > 0 && entriesCnt0 >= CacheDrHandler.this.batchSendSize || CacheDrHandler.this.batchSendSizeBytes > 0 && this.batchSize.get() > CacheDrHandler.this.batchSendSizeBytes || entriesCnt0 > 0 && CacheDrHandler.this.ccfg.getBatchSendFrequency() > 0L && U.currentTimeMillis() - this.created >= CacheDrHandler.this.ccfg.getBatchSendFrequency();
        }

        void createRequest() {
            Collection<SerializedDrEntry> entries0;
            this.lock.writeLock().lock();
            try {
                assert (this.noAdd);
                entries0 = this.entries;
                this.entries = null;
            }
            finally {
                this.lock.writeLock().unlock();
            }
            CacheDrHandler.this.createBatchRequest(entries0, this).send();
        }
    }

    private class BatchRequest {
        private final DrInternalRequest req;
        private final Permit permit;
        private final Map<UUID, Throwable> failedHubs = new HashMap<UUID, Throwable>();
        private final GridFutureAdapter<CacheDrResultType> fut = new GridFutureAdapter();
        private UUID hubId;
        private long delay;
        private Boolean syncFst;

        public BatchRequest(@Nullable DrInternalRequest req, @Nullable Permit permit, Boolean syncFst) {
            this.req = req;
            this.permit = permit;
            this.syncFst = syncFst;
        }

        public int entriesCount() {
            return this.req.entryCount();
        }

        IgniteInternalFuture<CacheDrResultType> future() {
            return this.fut;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void send() {
            BatchRequest old = CacheDrHandler.this.reqs.putIfAbsent(this.req.id(), this);
            assert (old == null || old == this);
            while (!this.fut.isDone()) {
                ClusterNode hub;
                BatchRequest batchRequest = this;
                synchronized (batchRequest) {
                    if (this.hubId != null) {
                        break;
                    }
                    hub = CacheDrHandler.this.mgr.nextHub(this.failedHubs.keySet());
                    if (hub != null && Boolean.TRUE.equals(this.syncFst) && !Boolean.TRUE.equals(hub.attribute("plugins.gg.replication.snd.fst.buffer.supported"))) {
                        this.failedHubs.put(hub.id(), new IgniteCheckedException("Failed to send replication batch because sender hub does not support sync FST: " + hub.id()));
                        continue;
                    }
                    if (hub != null) {
                        this.hubId = hub.id();
                    }
                }
                if (hub == null) {
                    if (CacheDrHandler.this.log.isTraceEnabled()) {
                        CacheDrHandler.this.log.trace("Failed to send internal DR request: " + this.req.id());
                    }
                    this.onFinish(CacheDrResultType.IGNORED);
                    break;
                }
                boolean resend = false;
                try {
                    if (CacheDrHandler.this.disableThrottling) {
                        this.req.force(true);
                    }
                    CacheDrHandler.this.cctx.gridIO().sendToCustomTopic(hub.id(), (Object)CU.replicationTopicSend(), (Message)this.req, (byte)33);
                    CacheDrHandler.this.mgr.metrics0().onSenderCacheBatchSent(this.req.entryCount());
                    if (CacheDrHandler.this.cctx.discovery().node(hub.id()) == null) {
                        this.failedHubs.put(hub.id(), new IgniteCheckedException("Failed to send replication batch because sender hub has left the grid: " + hub.id()));
                        resend = true;
                    }
                }
                catch (IgniteCheckedException e) {
                    if (!(e instanceof ClusterTopologyCheckedException)) {
                        U.error(CacheDrHandler.this.log, "Failed to send replication batch [hubId=" + hub.id() + ", batch=" + this.req + ']', e);
                    } else if (CacheDrHandler.this.log.isDebugEnabled()) {
                        CacheDrHandler.this.log.debug("Failed to send replication batch [hubId=" + hub.id() + ", batch=" + this.req + "]. Will retry with another sender hub.");
                    }
                    this.failedHubs.put(hub.id(), e);
                    resend = true;
                }
                if (!resend) {
                    if (!CacheDrHandler.this.log.isTraceEnabled()) break;
                    CacheDrHandler.this.log.trace("Internal DR request sent: " + this.req.id());
                    break;
                }
                BatchRequest batchRequest2 = this;
                synchronized (batchRequest2) {
                    if (!F.eq(this.hubId, hub.id())) {
                        break;
                    }
                    this.hubId = null;
                    if (CacheDrHandler.this.log.isTraceEnabled()) {
                        CacheDrHandler.this.log.trace("Internal DR request will be resent: req=" + this.req.id());
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void onResponse(UUID nodeId, byte code, @Nullable Throwable err) {
            if (code == 1 || err != null) {
                long delay0;
                boolean snd = false;
                BatchRequest batchRequest = this;
                synchronized (batchRequest) {
                    if (err != null) {
                        if (CacheDrHandler.this.log.isTraceEnabled()) {
                            CacheDrHandler.this.log.trace("Internal DR request failed: req=" + this.req.id() + ", node=" + nodeId + ", reason=" + err.getMessage());
                        }
                        this.failedHubs.put(nodeId, err);
                    }
                    if (F.eq(this.hubId, nodeId)) {
                        if (code == 1) {
                            long l = this.delay = this.delay == 0L ? 100L : Math.min((long)((double)this.delay * 1.5), 10000L);
                            if (this.delay > 500L && this.req.dataCenterIds() == null) {
                                this.req.force(true);
                            }
                        }
                        this.hubId = null;
                        snd = true;
                    }
                    delay0 = this.delay;
                }
                if (!snd) {
                    return;
                }
                if (code == 1) {
                    if (CacheDrHandler.this.log.isTraceEnabled()) {
                        CacheDrHandler.this.log.trace("Internal DR request rescheduled: req=" + this.req.id() + ", node=" + nodeId + ", delay=" + delay0);
                    }
                    CacheDrHandler.this.cctx.time().schedule(this::send, delay0, -1L);
                } else {
                    this.send();
                }
            } else {
                if (code == 2) {
                    if (CacheDrHandler.this.log.isTraceEnabled()) {
                        CacheDrHandler.this.log.trace("Internal DR request responded with unsupported code: req=" + this.req.id() + ", node=" + nodeId + ", code=" + code);
                    }
                    return;
                }
                this.delay = 0L;
                if (CacheDrHandler.this.log.isTraceEnabled()) {
                    CacheDrHandler.this.log.trace("Internal DR request ack received: req=" + this.req.id() + ", node=" + nodeId);
                }
                this.onFinish(CacheDrResultType.ACKNOWLEDGED);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void onHubsLeave(Collection<UUID> sndHubIds) {
            assert (!F.isEmpty(sndHubIds));
            boolean snd = false;
            BatchRequest batchRequest = this;
            synchronized (batchRequest) {
                if (this.hubId != null && sndHubIds.contains(this.hubId)) {
                    this.hubId = null;
                    snd = true;
                }
            }
            if (snd) {
                this.send();
            }
        }

        private synchronized void onFinish(CacheDrResultType resType) {
            assert (resType != null);
            if (!this.fut.isDone()) {
                if (this.permit != null) {
                    this.permit.release();
                }
                boolean rmv = CacheDrHandler.this.reqs.remove(this.req.id(), this);
                assert (resType != CacheDrResultType.ACKNOWLEDGED || rmv);
                if (resType == CacheDrResultType.IGNORED && !F.isEmpty(this.failedHubs)) {
                    resType = CacheDrResultType.FAILED;
                    if (!CacheDrHandler.this.mgr.stopped() && !CacheDrHandler.this.cctx.kernalContext().isStopping()) {
                        CacheDrHandler.this.mgr.onBatchFailed(this.failedHubs);
                    }
                }
                this.fut.onDone(resType);
                if (resType == CacheDrResultType.ACKNOWLEDGED) {
                    CacheDrHandler.this.mgr.metrics0().onSenderCacheBatchAcknowledged(this.req.entryCount());
                } else if (resType == CacheDrResultType.FAILED) {
                    CacheDrHandler.this.mgr.metrics0().onSenderCacheBatchFailed(this.req.entryCount());
                }
            }
        }
    }

    private class BatchManager {
        private final AtomicReference<Batch> curBatch = new AtomicReference();
        private final Object getMux = new Object();
        private final Object acquireMux = new Object();
        private final long maxCnt;
        private long curCnt;
        private boolean closed;

        BatchManager(int maxCnt) {
            this.maxCnt = Math.max(maxCnt, 0);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Nullable
        Batch getCurrentBatch() {
            Batch batch = this.curBatch.get();
            if (batch == null) {
                Object object = this.getMux;
                synchronized (object) {
                    batch = this.curBatch.get();
                    if (batch == null && this.acquire(500L)) {
                        batch = new Batch(true);
                        Batch oldBatch = this.curBatch.getAndSet(batch);
                        assert (oldBatch == null);
                    }
                }
            }
            return batch;
        }

        @Nullable
        Batch tryGetCurrentBatch() {
            return this.curBatch.get();
        }

        void discardBatch(Batch batch) {
            this.curBatch.compareAndSet(batch, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean acquire(long timeout) {
            assert (timeout > 0L);
            Object object = this.acquireMux;
            synchronized (object) {
                long endTime = U.currentTimeMillis() + timeout;
                while (!this.closed) {
                    if (this.maxCnt == 0L || this.curCnt < this.maxCnt) {
                        ++this.curCnt;
                        return true;
                    }
                    long remaining = endTime - U.currentTimeMillis();
                    if (remaining <= 0L) break;
                    try {
                        this.acquireMux.wait(remaining);
                    }
                    catch (InterruptedException ignored) {
                        Thread.currentThread().interrupt();
                        throw new IgniteInterruptedException("Interrupted while waiting for batch acquire.");
                    }
                }
                return false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void release() {
            Object object = this.acquireMux;
            synchronized (object) {
                --this.curCnt;
                assert (this.curCnt >= 0L);
                this.acquireMux.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void cancel() {
            Object object = this.acquireMux;
            synchronized (object) {
                this.closed = true;
                this.acquireMux.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        int batchesAcquired() {
            Object object = this.acquireMux;
            synchronized (object) {
                int res = (int)this.curCnt;
                if (res < 0) {
                    res = Integer.MAX_VALUE;
                }
                return res;
            }
        }
    }
}

