package org.gridgain.grid.internal.processors.cache.dr.ist;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.gridgain.grid.internal.interop.InteropUtils;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrMetrics;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrResultType;
import org.gridgain.grid.internal.processors.cache.dr.EntryBuffer;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequestEntry;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalResponse;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/gridgain/grid/internal/processors/cache/dr/ist/CacheDrBatchManager.class */
public class CacheDrBatchManager {
    private static final AtomicLong ID_GEN;
    public static final IgniteUuid FAKE_FST_ID;
    static final int INITIAL_DELAY = 100;
    static final int MAX_DELAY = 10000;
    private final GridCacheContext cctx;
    private final IgniteLogger log;
    private final Object topic;
    private final CacheSenderHubManager sndHubMgr;
    private final ConcurrentMap<Long, BatchRequest> reqMap = new ConcurrentHashMap();
    private final Supplier<CacheDrMetrics> metrics;
    private final GridBusyLock busyLock;
    public final Permit permits;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.gridgain.grid.internal.processors.cache.dr.ist.CacheDrBatchManager$3, reason: invalid class name */
    /* loaded from: input_file:org/gridgain/grid/internal/processors/cache/dr/ist/CacheDrBatchManager$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$gridgain$grid$internal$processors$cache$dr$CacheDrResultType = new int[CacheDrResultType.values().length];

        static {
            try {
                $SwitchMap$org$gridgain$grid$internal$processors$cache$dr$CacheDrResultType[CacheDrResultType.ACKNOWLEDGED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$gridgain$grid$internal$processors$cache$dr$CacheDrResultType[CacheDrResultType.IGNORED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$gridgain$grid$internal$processors$cache$dr$CacheDrResultType[CacheDrResultType.FAILED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/internal/processors/cache/dr/ist/CacheDrBatchManager$BatchRequest.class */
    public class BatchRequest {
        private final long reqId;
        private final int size;
        private final int entryCount;
        private final Map<UUID, Throwable> failedHubs = new HashMap();
        private final GridFutureAdapter<CacheDrResultType> fut = new GridFutureAdapter<>();
        private final DrBatch lsnr;
        private UUID hubId;
        private long delay;
        private volatile DrInternalRequest req;
        static final /* synthetic */ boolean $assertionsDisabled;

        BatchRequest(DrInternalRequest drInternalRequest, DrBatch drBatch) {
            this.req = drInternalRequest;
            this.lsnr = drBatch;
            this.size = drInternalRequest.entries().parallelStream().mapToInt((v0) -> {
                return v0.dataLength();
            }).sum();
            this.entryCount = drInternalRequest.entryCount();
            this.reqId = drInternalRequest.id();
            this.req.force(true);
        }

        public int entriesCount() {
            return this.entryCount;
        }

        IgniteInternalFuture<CacheDrResultType> future() {
            return this.fut;
        }

        int size() {
            return this.size;
        }

        void onHubsLeave(UUID uuid) {
            Objects.requireNonNull(uuid);
            synchronized (this) {
                if (uuid.equals(this.hubId)) {
                    this.hubId = null;
                    if (CacheDrBatchManager.this.busyLock.enterBusy()) {
                        try {
                            send();
                            CacheDrBatchManager.this.busyLock.leaveBusy();
                        } catch (Throwable th) {
                            CacheDrBatchManager.this.busyLock.leaveBusy();
                            throw th;
                        }
                    }
                }
            }
        }

        void send() {
            DrInternalRequest drInternalRequest = this.req;
            BatchRequest batchRequest = (BatchRequest) CacheDrBatchManager.this.reqMap.putIfAbsent(Long.valueOf(this.reqId), this);
            if (!$assertionsDisabled && batchRequest != null && batchRequest != this) {
                throw new AssertionError();
            }
            if (drInternalRequest == null) {
                onDone(CacheDrResultType.FAILED);
                return;
            }
            while (!this.fut.isDone()) {
                synchronized (this) {
                    if (this.hubId != null) {
                        return;
                    }
                    ClusterNode nextSender = CacheDrBatchManager.this.sndHubMgr.nextSender(this.failedHubs.keySet());
                    if (nextSender != null) {
                        this.hubId = nextSender.id();
                    }
                    if (nextSender == null) {
                        if (CacheDrBatchManager.this.log.isDebugEnabled()) {
                            CacheDrBatchManager.this.log.debug("Failed to send replication batch, no sender hub found: " + drInternalRequest.id());
                        }
                        onDone(CacheDrResultType.IGNORED);
                        return;
                    }
                    try {
                        CacheDrBatchManager.this.cctx.gridIO().sendToCustomTopic(nextSender.id(), CU.replicationTopicSend(), drInternalRequest, (byte) 33);
                    } catch (IgniteCheckedException e) {
                        if (!(e instanceof ClusterTopologyCheckedException)) {
                            U.error(CacheDrBatchManager.this.log, "Failed to send replication batch [hubId=" + nextSender.id() + ", batch=" + drInternalRequest + ']', e);
                        } else if (CacheDrBatchManager.this.log.isTraceEnabled()) {
                            CacheDrBatchManager.this.log.trace("Failed to send replication batch [hubId=" + nextSender.id() + ", batch=" + drInternalRequest + "]. Will retry with another sender hub.");
                        }
                        this.failedHubs.put(nextSender.id(), e);
                    }
                    if (CacheDrBatchManager.this.cctx.discovery().node(nextSender.id()) != null) {
                        if (CacheDrBatchManager.this.log.isDebugEnabled()) {
                            CacheDrBatchManager.this.log.debug("Replication batch sent: " + drInternalRequest.id());
                        }
                        ((CacheDrMetrics) CacheDrBatchManager.this.metrics.get()).onSenderCacheBatchSent(this.entryCount);
                        return;
                    } else {
                        this.failedHubs.put(nextSender.id(), new IgniteCheckedException("Failed to send replication batch because sender hub has left the grid: " + nextSender.id()));
                        synchronized (this) {
                            if (!F.eq(this.hubId, nextSender.id())) {
                                return;
                            }
                            this.hubId = null;
                            if (CacheDrBatchManager.this.log.isTraceEnabled()) {
                                CacheDrBatchManager.this.log.trace("Try resend replication batch: req=" + drInternalRequest.id());
                            }
                        }
                    }
                }
            }
        }

        void onError(UUID uuid, @Nullable Throwable th) {
            if (!$assertionsDisabled && th == null) {
                throw new AssertionError();
            }
            if (CacheDrBatchManager.this.log.isDebugEnabled()) {
                CacheDrBatchManager.this.log.debug("Internal DR request failed: req=" + this.reqId + ", node=" + uuid + ", reason=" + th.getMessage());
            }
            synchronized (this) {
                this.failedHubs.put(uuid, th);
                if (F.eq(this.hubId, uuid)) {
                    this.hubId = null;
                    send();
                }
            }
        }

        void onResponse(UUID uuid, byte b) {
            switch (b) {
                case 1:
                    synchronized (this) {
                        if (F.eq(this.hubId, uuid)) {
                            this.delay = this.delay == 0 ? 100L : Math.min((long) (this.delay * 1.5d), 10000L);
                            this.hubId = null;
                            long j = this.delay;
                            if (CacheDrBatchManager.this.log.isTraceEnabled()) {
                                CacheDrBatchManager.this.log.trace("Internal DR request rescheduled: req=" + this.reqId + ", node=" + uuid + ", delay=" + j);
                            }
                            CacheDrBatchManager.this.cctx.time().schedule(this::send, j, -1L);
                            return;
                        }
                        return;
                    }
                case 2:
                    if (CacheDrBatchManager.this.log.isTraceEnabled()) {
                        CacheDrBatchManager.this.log.trace("Internal DR request confirmation received: req=" + this.reqId + ", node=" + uuid);
                    }
                    release();
                    this.lsnr.onSend();
                    return;
                default:
                    if (b != 0) {
                        CacheDrBatchManager.this.log.warning("Internal DR response with unknown error code, consider delivery was successful: " + ((int) b) + "req=" + this.req.id() + ", node=" + uuid);
                    } else if (CacheDrBatchManager.this.log.isTraceEnabled()) {
                        CacheDrBatchManager.this.log.trace("Internal DR request ack received: req=" + this.reqId + ", node=" + uuid);
                    }
                    onDone(CacheDrResultType.ACKNOWLEDGED);
                    return;
            }
        }

        private synchronized void release() {
            if (this.req != null) {
                this.req = null;
                CacheDrBatchManager.this.releasePermit(size());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void onDone(CacheDrResultType cacheDrResultType) {
            if (!$assertionsDisabled && cacheDrResultType == null) {
                throw new AssertionError();
            }
            if (this.fut.isDone()) {
                return;
            }
            boolean remove = CacheDrBatchManager.this.reqMap.remove(Long.valueOf(this.reqId), this);
            if (!$assertionsDisabled && !remove) {
                throw new AssertionError();
            }
            release();
            switch (AnonymousClass3.$SwitchMap$org$gridgain$grid$internal$processors$cache$dr$CacheDrResultType[cacheDrResultType.ordinal()]) {
                case 1:
                    if (!$assertionsDisabled && !remove) {
                        throw new AssertionError();
                    }
                    this.lsnr.onAck();
                    ((CacheDrMetrics) CacheDrBatchManager.this.metrics.get()).onSenderCacheBatchAcknowledged(this.entryCount);
                    break;
                case 2:
                    this.lsnr.onReject();
                    break;
                case InteropUtils.OP_DR_ENTRY_FILTER_APPLY /* 3 */:
                    CacheDrBatchManager.this.log.info("Incremental DR batch failed, will retry: id=" + this.reqId + '.');
                    this.lsnr.onReject();
                    ((CacheDrMetrics) CacheDrBatchManager.this.metrics.get()).onSenderCacheBatchFailed(this.entryCount);
                    break;
            }
            if (CacheDrBatchManager.this.log.isDebugEnabled()) {
                CacheDrBatchManager.this.log.debug("Internal DR request done: id=" + this.reqId + ", res=" + cacheDrResultType);
            }
            this.fut.onDone(cacheDrResultType);
        }

        static {
            $assertionsDisabled = !CacheDrBatchManager.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/internal/processors/cache/dr/ist/CacheDrBatchManager$Permit.class */
    public static class Permit {
        private final long maxPermits;
        private long permits;
        static final /* synthetic */ boolean $assertionsDisabled;

        private Permit(long j) {
            this.permits = j;
            this.maxPermits = j;
        }

        synchronized boolean acquire(int i) throws InterruptedException {
            if (this.permits < i && this.permits < this.maxPermits) {
                return false;
            }
            this.permits -= i;
            return true;
        }

        synchronized void release(int i) {
            this.permits += i;
            if (!$assertionsDisabled && this.permits > this.maxPermits) {
                throw new AssertionError();
            }
        }

        static {
            $assertionsDisabled = !CacheDrBatchManager.class.desiredAssertionStatus();
        }
    }

    public CacheDrBatchManager(GridCacheContext gridCacheContext, IgniteLogger igniteLogger, CacheSenderHubManager cacheSenderHubManager, Supplier<CacheDrMetrics> supplier, GridBusyLock gridBusyLock, long j) {
        if (!$assertionsDisabled && j <= 0) {
            throw new AssertionError();
        }
        this.cctx = gridCacheContext;
        this.log = igniteLogger;
        this.sndHubMgr = cacheSenderHubManager;
        this.metrics = supplier;
        this.busyLock = gridBusyLock;
        this.topic = CU.replicationTopicReceive(gridCacheContext.name());
        this.permits = new Permit(j);
    }

    public void start() {
        if (this.log.isTraceEnabled()) {
            this.log.trace("Start DR batch manager: cache=" + this.cctx.name());
        }
        this.cctx.gridIO().addMessageListener(this.topic, new GridMessageListener() { // from class: org.gridgain.grid.internal.processors.cache.dr.ist.CacheDrBatchManager.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public void onMessage(UUID uuid, Object obj, byte b) {
                if (!(obj instanceof DrInternalResponse)) {
                    if (!$assertionsDisabled) {
                        throw new AssertionError("Unexpected message type: " + obj);
                    }
                    return;
                }
                if (CacheDrBatchManager.this.log.isTraceEnabled()) {
                    CacheDrBatchManager.this.log.trace("Received internal replication response message [sourceNodeId=" + uuid + ", msg=" + obj + ']');
                }
                DrInternalResponse drInternalResponse = (DrInternalResponse) obj;
                BatchRequest batchRequest = (BatchRequest) CacheDrBatchManager.this.reqMap.get(Long.valueOf(drInternalResponse.id()));
                if (batchRequest != null) {
                    try {
                        if (CacheDrBatchManager.this.busyLock.enterBusy()) {
                            try {
                                drInternalResponse.finishUnmarshall(CacheDrBatchManager.this.cctx.marshaller(), U.resolveClassLoader(CacheDrBatchManager.this.cctx.gridConfig()));
                                if (drInternalResponse.error() != null) {
                                    batchRequest.onError(uuid, drInternalResponse.error());
                                } else {
                                    batchRequest.onResponse(uuid, drInternalResponse.code());
                                }
                                CacheDrBatchManager.this.busyLock.leaveBusy();
                            } catch (IgniteCheckedException e) {
                                U.error(CacheDrBatchManager.this.log, "Failed to unmarshal message (will ignore): " + drInternalResponse, e);
                                batchRequest.onDone(CacheDrResultType.FAILED);
                                CacheDrBatchManager.this.busyLock.leaveBusy();
                            }
                        }
                    } catch (Throwable th) {
                        CacheDrBatchManager.this.busyLock.leaveBusy();
                        throw th;
                    }
                }
            }

            static {
                $assertionsDisabled = !CacheDrBatchManager.class.desiredAssertionStatus();
            }
        });
    }

    public void stop() {
        if (this.log.isTraceEnabled()) {
            this.log.trace("Stop DR batch manager: cache=" + this.cctx.name());
        }
        this.cctx.kernalContext().io().removeMessageListener(this.topic);
        this.reqMap.forEach((l, batchRequest) -> {
            batchRequest.onDone(CacheDrResultType.IGNORED);
        });
        this.reqMap.clear();
    }

    public IgniteInternalFuture<CacheDrResultType> send(Map<Byte, EntryBuffer> map, DrBatch drBatch) {
        return send(map, drBatch, FAKE_FST_ID, Collections.EMPTY_LIST);
    }

    public IgniteInternalFuture<CacheDrResultType> send(Map<Byte, EntryBuffer> map, DrBatch drBatch, IgniteUuid igniteUuid, Collection<Byte> collection) {
        BatchRequest batchRequest = new BatchRequest(createInternalRequest(map, collection, igniteUuid), drBatch);
        acquirePermit(batchRequest.size());
        try {
            batchRequest.send();
            return batchRequest.future();
        } catch (Throwable th) {
            batchRequest.onDone(CacheDrResultType.FAILED);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onSenderNodeLeave(final UUID uuid) {
        if (this.reqMap.isEmpty()) {
            return;
        }
        this.cctx.closures().runLocalSafe(new GridPlainRunnable() { // from class: org.gridgain.grid.internal.processors.cache.dr.ist.CacheDrBatchManager.2
            public void run() {
                try {
                    Iterator it = CacheDrBatchManager.this.reqMap.values().iterator();
                    while (it.hasNext()) {
                        ((BatchRequest) it.next()).onHubsLeave(uuid);
                    }
                } catch (Error | RuntimeException e) {
                    U.error(CacheDrBatchManager.this.log, "Unexpected runtime exception.", e);
                    throw e;
                }
            }
        });
    }

    private DrInternalRequest createInternalRequest(Map<Byte, EntryBuffer> map, Collection<Byte> collection, IgniteUuid igniteUuid) {
        ArrayList arrayList = new ArrayList(map.size());
        int i = 0;
        try {
            for (Map.Entry<Byte, EntryBuffer> entry : map.entrySet()) {
                EntryBuffer value = entry.getValue();
                value.flush();
                arrayList.add(new DrInternalRequestEntry(entry.getKey().byteValue(), value.entriesCnt(), value.getBytes(), value.sizeBytes()));
                i += value.entriesCnt();
            }
            return new DrInternalRequest(ID_GEN.incrementAndGet(), this.cctx.name(), collection, arrayList, i, igniteUuid);
        } finally {
            Iterator<EntryBuffer> it = map.values().iterator();
            while (it.hasNext()) {
                U.closeQuiet(it.next());
            }
        }
    }

    public int queuedKeysCount() {
        if (!this.busyLock.enterBusy()) {
            return 0;
        }
        try {
            int i = 0;
            Iterator<BatchRequest> it = this.reqMap.values().iterator();
            while (it.hasNext()) {
                i += it.next().entriesCount();
            }
            return i;
        } finally {
            this.busyLock.leaveBusy();
        }
    }

    public int batchWaitingAcknowledgeCount() {
        return this.reqMap.size();
    }

    private synchronized void acquirePermit(int i) {
        try {
            long currentTimeMillis = U.currentTimeMillis();
            while (!this.permits.acquire(i)) {
                wait(500L);
                long currentTimeMillis2 = U.currentTimeMillis();
                long j = currentTimeMillis2 - currentTimeMillis;
                if (j > 0) {
                    this.metrics.get().onFstThrottling(j);
                    currentTimeMillis = currentTimeMillis2;
                }
            }
        } catch (InterruptedException e) {
            throw new IgniteInterruptedException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void releasePermit(int i) {
        this.permits.release(i);
        notifyAll();
    }

    static {
        $assertionsDisabled = !CacheDrBatchManager.class.desiredAssertionStatus();
        ID_GEN = new AtomicLong();
        FAKE_FST_ID = IgniteUuid.randomUuid();
    }
}
