package org.gridgain.grid.internal.processors.cache.dr.ist;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.GridBusyLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.gridgain.grid.cache.dr.CacheDrSenderConfiguration;
import org.gridgain.grid.configuration.DrSenderConfiguration;
import org.gridgain.grid.configuration.GridGainConfiguration;
import org.gridgain.grid.dr.DrSenderLoadBalancingMode;
import org.gridgain.grid.internal.GridPluginNodeAttributes;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrMetrics;
import org.gridgain.grid.internal.processors.cache.dr.CacheDrResultType;
import org.gridgain.grid.internal.processors.cache.dr.EntryBuffer;
import org.gridgain.grid.internal.processors.cache.dr.ist.LoadBalancer;
import org.gridgain.grid.internal.processors.dr.DrProcessor;
import org.gridgain.grid.internal.processors.dr.DrSenderAttributes;
import org.gridgain.grid.internal.processors.dr.DrUtils;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequest;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalRequestEntry;
import org.gridgain.grid.internal.processors.dr.messages.DrInternalResponse;
import org.jetbrains.annotations.Nullable;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/gridgain/grid/internal/processors/cache/dr/ist/CacheSenderHubManager.class */
public class CacheSenderHubManager {
    private static final AtomicLong ID_GEN;
    public static final IgniteUuid FAKE_FST_ID;
    private final GridGainConfiguration ggCfg;
    private final CacheDrSenderConfiguration sndCfg;
    private final DrSenderNode locSnd;
    private final IgniteLogger log;
    private final GridCacheContext<?, ?> cctx;
    private final GridIoManager ioMgr;
    private final DrProcessor drProc;
    private final LoadBalancer<DrSenderNode> loadBalancer;
    private final Object topic;
    private final Supplier<CacheDrMetrics> metrics;
    private final GridBusyLock busyLock;
    private final Permit permits;
    private volatile boolean stopping;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ConcurrentMap<UUID, DrSenderNode> sndNodes = new ConcurrentHashMap();
    private final ConcurrentMap<Long, BatchRequest> reqMap = new ConcurrentHashMap();
    private final Queue<Long> reqIdsToResend = new LinkedBlockingQueue();
    private final AtomicBoolean resendScheduled = new AtomicBoolean();
    private volatile BitSet ignoredDcs = new BitSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/internal/processors/cache/dr/ist/CacheSenderHubManager$BatchRequest.class */
    public class BatchRequest {
        private final long reqId;
        private final int size;
        private final int entryCnt;
        private final DrBatchEventListener lsnr;
        private volatile DrInternalRequest req;
        private final AtomicBoolean completed;
        static final /* synthetic */ boolean $assertionsDisabled;

        private BatchRequest(DrInternalRequest drInternalRequest, DrBatchEventListener drBatchEventListener) {
            this.completed = new AtomicBoolean();
            this.req = drInternalRequest;
            this.lsnr = drBatchEventListener;
            this.size = drInternalRequest.entries().parallelStream().mapToInt((v0) -> {
                return v0.dataLength();
            }).sum();
            this.entryCnt = drInternalRequest.entryCount();
            this.reqId = drInternalRequest.id();
            this.req.force(true);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long id() {
            return this.reqId;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int entriesCount() {
            return this.entryCnt;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int size() {
            return this.size;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void confirm() {
            synchronized (this) {
                if (this.req == null) {
                    return;
                }
                this.req = null;
                CacheSenderHubManager.this.releasePermit(size());
                this.lsnr.onSent();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void complete(CacheDrResultType cacheDrResultType, @Nullable Throwable th) {
            if (!$assertionsDisabled && cacheDrResultType == null) {
                throw new AssertionError();
            }
            if (this.completed.compareAndSet(false, true)) {
                BatchRequest batchRequest = (BatchRequest) CacheSenderHubManager.this.reqMap.remove(Long.valueOf(this.reqId));
                confirm();
                switch (cacheDrResultType) {
                    case ACKNOWLEDGED:
                        if (!$assertionsDisabled && batchRequest == null) {
                            throw new AssertionError();
                        }
                        if (CacheSenderHubManager.this.log.isDebugEnabled()) {
                            CacheSenderHubManager.this.log.debug("Incremental DR batch acked: id=" + this.reqId);
                        }
                        this.lsnr.onAcked();
                        ((CacheDrMetrics) CacheSenderHubManager.this.metrics.get()).onSenderCacheBatchAcknowledged(this.entryCnt);
                        return;
                    case IGNORED:
                        if (CacheSenderHubManager.this.log.isDebugEnabled()) {
                            CacheSenderHubManager.this.log.debug("Internal DR request ignored: id=" + this.reqId);
                        }
                        this.lsnr.onRejected(null);
                        ((CacheDrMetrics) CacheSenderHubManager.this.metrics.get()).onSenderCacheBatchRejected(this.entryCnt);
                        return;
                    case FAILED:
                        CacheSenderHubManager.this.log.info("Incremental DR batch failed, will retry: id=" + this.reqId);
                        this.lsnr.onRejected(th);
                        ((CacheDrMetrics) CacheSenderHubManager.this.metrics.get()).onSenderCacheBatchFailed(this.entryCnt);
                        return;
                    default:
                        return;
                }
            }
        }

        static {
            $assertionsDisabled = !CacheSenderHubManager.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gridgain/grid/internal/processors/cache/dr/ist/CacheSenderHubManager$Permit.class */
    public static class Permit {
        private final long maxPermits;
        private long permits;
        static final /* synthetic */ boolean $assertionsDisabled;

        private Permit(long j) {
            this.permits = j;
            this.maxPermits = j;
        }

        synchronized boolean acquire(int i) {
            if (this.permits < i && this.permits < this.maxPermits) {
                return false;
            }
            this.permits -= i;
            return true;
        }

        synchronized void release(int i) {
            this.permits += i;
            if (!$assertionsDisabled && this.permits > this.maxPermits) {
                throw new AssertionError();
            }
        }

        static {
            $assertionsDisabled = !CacheSenderHubManager.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CacheSenderHubManager(GridCacheContext gridCacheContext, GridGainConfiguration gridGainConfiguration, CacheDrSenderConfiguration cacheDrSenderConfiguration, DrProcessor drProcessor, Supplier<CacheDrMetrics> supplier, long j) {
        Objects.nonNull(gridCacheContext);
        Objects.nonNull(gridGainConfiguration);
        Objects.nonNull(cacheDrSenderConfiguration);
        this.cctx = gridCacheContext;
        this.ggCfg = gridGainConfiguration;
        this.sndCfg = cacheDrSenderConfiguration;
        this.drProc = drProcessor;
        this.metrics = supplier;
        this.log = gridCacheContext.logger(CacheSenderHubManager.class);
        this.ioMgr = gridCacheContext.gridIO();
        this.topic = CU.replicationTopicReceive(gridCacheContext.name());
        this.permits = new Permit(j);
        this.busyLock = new GridBusyLock();
        this.locSnd = (hasLocalSender(gridCacheContext, cacheDrSenderConfiguration) && cacheDrSenderConfiguration.isPreferLocalSender()) ? fromClusterNodeId(gridCacheContext.localNodeId()) : null;
        this.loadBalancer = cacheDrSenderConfiguration.getLoadBalancingMode() == DrSenderLoadBalancingMode.DR_ROUND_ROBIN ? new LoadBalancer.RoundRobinBalancer() : new LoadBalancer.RandomBalancer();
    }

    private boolean hasLocalSender(GridCacheContext gridCacheContext, CacheDrSenderConfiguration cacheDrSenderConfiguration) {
        DrSenderConfiguration drSenderConfiguration = this.ggCfg.getDrSenderConfiguration();
        if (drSenderConfiguration == null) {
            return false;
        }
        if (this.ggCfg.isDrUseCacheNames()) {
            if ($assertionsDisabled || !F.isEmpty(drSenderConfiguration.getCacheNames())) {
                return Arrays.stream(drSenderConfiguration.getCacheNames()).anyMatch(str -> {
                    return str.equals(gridCacheContext.name());
                });
            }
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !F.isEmpty(drSenderConfiguration.getCacheNames())) {
            throw new AssertionError("cache names are not allowed.");
        }
        return Arrays.asList(DrUtils.effectiveSenderGroups(drSenderConfiguration)).contains(DrUtils.effectiveSenderGroup(cacheDrSenderConfiguration));
    }

    public boolean shouldIgnoreDc(byte b) {
        return this.ignoredDcs.get(b);
    }

    public boolean isSenderNode(ClusterNode clusterNode) {
        if (clusterNode.attribute(GridPluginNodeAttributes.ATTR_REPLICATION_IST_SND_HUB) == null) {
            return false;
        }
        if (this.ggCfg.isDrUseCacheNames()) {
            DrSenderAttributes drSenderAttributes = (DrSenderAttributes) clusterNode.attribute(GridPluginNodeAttributes.ATTR_REPLICATION_IST_SND_HUB);
            String mask = CU.mask(this.cctx.name());
            return drSenderAttributes != null && drSenderAttributes.getCacheNames().stream().anyMatch(str -> {
                return str.equals(mask);
            });
        }
        String[] strArr = (String[]) clusterNode.attribute(GridPluginNodeAttributes.ATTR_REPLICATION_IST_SND_GROUPS);
        if (F.isEmpty(strArr)) {
            return false;
        }
        return Arrays.asList(strArr).contains(DrUtils.effectiveSenderGroup(this.sndCfg));
    }

    @Nullable
    private DrSenderNode nextSender() {
        if (this.locSnd != null) {
            if (this.locSnd.active()) {
                return this.locSnd;
            }
            return null;
        }
        List<DrSenderNode> list = (List) this.sndNodes.values().stream().filter((v0) -> {
            return v0.active();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return null;
        }
        return this.loadBalancer.apply(list);
    }

    public void startServingRequests() {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Start DR batch manager: cache=" + this.cctx.name());
        }
        this.ioMgr.addMessageListener(this.topic, new GridMessageListener() { // from class: org.gridgain.grid.internal.processors.cache.dr.ist.CacheSenderHubManager.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public void onMessage(UUID uuid, Object obj, byte b) {
                if (!(obj instanceof DrInternalResponse)) {
                    if (!$assertionsDisabled) {
                        throw new AssertionError("Unexpected message type: " + obj);
                    }
                    return;
                }
                if (CacheSenderHubManager.this.log.isTraceEnabled()) {
                    CacheSenderHubManager.this.log.trace("Received internal replication response message [sourceNodeId=" + uuid + ", msg=" + obj + ']');
                }
                if (CacheSenderHubManager.this.busyLock.enterBusy()) {
                    DrInternalResponse drInternalResponse = (DrInternalResponse) obj;
                    try {
                        try {
                            drInternalResponse.finishUnmarshall(CacheSenderHubManager.this.cctx.marshaller(), U.resolveClassLoader(CacheSenderHubManager.this.cctx.gridConfig()));
                            CacheSenderHubManager.this.onResponse(uuid, drInternalResponse);
                            CacheSenderHubManager.this.busyLock.leaveBusy();
                        } catch (IgniteCheckedException e) {
                            U.error(CacheSenderHubManager.this.log, "Failed to unmarshal message (will ignore): " + obj, e);
                            BatchRequest batchRequest = (BatchRequest) CacheSenderHubManager.this.reqMap.get(Long.valueOf(drInternalResponse.id()));
                            if (batchRequest != null) {
                                batchRequest.complete(CacheDrResultType.FAILED, e);
                            }
                            CacheSenderHubManager.this.busyLock.leaveBusy();
                        }
                    } catch (Throwable th) {
                        CacheSenderHubManager.this.busyLock.leaveBusy();
                        throw th;
                    }
                }
            }

            static {
                $assertionsDisabled = !CacheSenderHubManager.class.desiredAssertionStatus();
            }
        });
    }

    public void send(Map<Byte, EntryBuffer> map, DrBatchEventListener drBatchEventListener) {
        send(map, drBatchEventListener, FAKE_FST_ID, Collections.emptyList());
    }

    public void send(Map<Byte, EntryBuffer> map, DrBatchEventListener drBatchEventListener, IgniteUuid igniteUuid, Collection<Byte> collection) {
        BatchRequest batchRequest = new BatchRequest(createInternalRequest(map, collection, igniteUuid), drBatchEventListener);
        if (!this.busyLock.enterBusy()) {
            return;
        }
        while (true) {
            try {
                resend0();
                if (acquirePermit(batchRequest.size(), 100L)) {
                    BatchRequest put = this.reqMap.put(Long.valueOf(batchRequest.id()), batchRequest);
                    if (!$assertionsDisabled && put != null) {
                        throw new AssertionError();
                    }
                    if (!send(batchRequest.req)) {
                        scheduleResend();
                    }
                } else if (this.stopping) {
                    break;
                }
            } finally {
                this.busyLock.leaveBusy();
            }
        }
    }

    private void resend() {
        if (this.reqIdsToResend.isEmpty() || !this.busyLock.enterBusy()) {
            return;
        }
        try {
            if (!resend0()) {
                scheduleResend();
            }
        } finally {
            this.busyLock.leaveBusy();
        }
    }

    private boolean resend0() {
        Long poll;
        while (!this.stopping && (poll = this.reqIdsToResend.poll()) != null) {
            BatchRequest batchRequest = this.reqMap.get(poll);
            if (batchRequest != null) {
                DrInternalRequest drInternalRequest = batchRequest.req;
                if (drInternalRequest == null) {
                    batchRequest.complete(CacheDrResultType.IGNORED, null);
                } else if (this.sndNodes.isEmpty()) {
                    batchRequest.complete(CacheDrResultType.IGNORED, null);
                } else if (!send(drInternalRequest)) {
                    return false;
                }
            }
        }
        return true;
    }

    private void scheduleResend() {
        if (this.resendScheduled.compareAndSet(false, true)) {
            this.cctx.time().schedule(() -> {
                this.drProc.submit(() -> {
                    if (this.resendScheduled.compareAndSet(true, false)) {
                        resend();
                    }
                });
            }, 100L, -1L);
        }
    }

    private boolean send(DrInternalRequest drInternalRequest) {
        DrSenderNode nextSender = nextSender();
        if (nextSender == null) {
            this.reqIdsToResend.add(Long.valueOf(drInternalRequest.id()));
            return false;
        }
        try {
            nextSender.send(drInternalRequest);
            this.metrics.get().onSenderCacheBatchSent(drInternalRequest.entryCount());
            return true;
        } catch (IgniteCheckedException e) {
            this.log.info("Unable to sent request to sender: reqId=" + drInternalRequest.id() + ", nodeId=" + nextSender.id() + ", err=" + e);
            nextSender.temporarilySwitchOff();
            this.reqIdsToResend.add(Long.valueOf(drInternalRequest.id()));
            return false;
        }
    }

    private synchronized boolean acquirePermit(int i, long j) {
        if (this.permits.acquire(i)) {
            return true;
        }
        try {
            long currentTimeMillis = U.currentTimeMillis();
            wait(j);
            long currentTimeMillis2 = U.currentTimeMillis() - currentTimeMillis;
            if (currentTimeMillis2 > 0) {
                this.metrics.get().onFstThrottling(currentTimeMillis2);
            }
            return this.permits.acquire(i);
        } catch (InterruptedException e) {
            throw new IgniteInterruptedException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void releasePermit(int i) {
        this.permits.release(i);
        notifyAll();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean registerSender(ClusterNode clusterNode) {
        ClusterNode node = this.cctx.discovery().node(clusterNode.id());
        if (node == null) {
            return false;
        }
        if (!$assertionsDisabled && !isSenderNode(node)) {
            throw new AssertionError();
        }
        DrSenderNode fromClusterNodeId = (this.locSnd == null || !node.id().equals(this.locSnd.id())) ? fromClusterNodeId(node.id()) : this.locSnd;
        if (this.sndNodes.isEmpty()) {
            BitSet bitSet = new BitSet();
            Collection<Byte> ignoreList = ((DrSenderAttributes) node.attribute(GridPluginNodeAttributes.ATTR_REPLICATION_IST_SND_HUB)).getIgnoreList();
            bitSet.getClass();
            ignoreList.forEach((v1) -> {
                r1.set(v1);
            });
            this.ignoredDcs = bitSet;
        }
        if (this.sndNodes.putIfAbsent(fromClusterNodeId.id(), fromClusterNodeId) != null) {
            return false;
        }
        if (!this.log.isDebugEnabled()) {
            return true;
        }
        this.log.debug("New sender registered: cache=" + this.cctx.name() + ", sndHubNode=" + node.id());
        return true;
    }

    private DrSenderNode fromClusterNodeId(UUID uuid) {
        return new DrSenderNode(this.ioMgr, this.cctx.time(), uuid);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean unregisterSender(UUID uuid) {
        DrSenderNode remove = this.sndNodes.remove(uuid);
        if (remove != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Sender unregistered: cache=" + this.cctx.name() + ", sndHubNode=" + uuid);
            }
            Iterator<Long> it = remove.unprocessedRequests().iterator();
            while (it.hasNext()) {
                BatchRequest batchRequest = this.reqMap.get(it.next());
                if (batchRequest != null) {
                    batchRequest.complete(CacheDrResultType.IGNORED, null);
                }
            }
        }
        return this.sndNodes.isEmpty();
    }

    public int sendersCnt() {
        return this.sndNodes.size();
    }

    public int queuedKeysCount() {
        if (!this.busyLock.enterBusy()) {
            return 0;
        }
        try {
            int i = 0;
            Iterator<BatchRequest> it = this.reqMap.values().iterator();
            while (it.hasNext()) {
                i += it.next().entriesCount();
            }
            return i;
        } finally {
            this.busyLock.leaveBusy();
        }
    }

    public int batchWaitingAcknowledgeCount() {
        return this.reqMap.size();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onResponse(UUID uuid, DrInternalResponse drInternalResponse) {
        if (!$assertionsDisabled && drInternalResponse == null) {
            throw new AssertionError();
        }
        BatchRequest batchRequest = this.reqMap.get(Long.valueOf(drInternalResponse.id()));
        if (batchRequest == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Got response for unknown DR request: req=" + drInternalResponse.id() + ", code=" + ((int) drInternalResponse.code()) + ", node=" + uuid);
                return;
            }
            return;
        }
        DrSenderNode drSenderNode = this.sndNodes.get(uuid);
        if (drSenderNode != null) {
            drSenderNode.onResponse(drInternalResponse);
        }
        byte code = drInternalResponse.code();
        switch (code) {
            case 1:
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Internal DR request rescheduled: req=" + drInternalResponse.id() + ", node=" + uuid);
                }
                this.reqIdsToResend.add(Long.valueOf(drInternalResponse.id()));
                scheduleResend();
                this.metrics.get().onSenderCacheBatchRejected(batchRequest.entriesCount());
                return;
            case 2:
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Internal DR request confirmation received: req=" + drInternalResponse.id() + ", node=" + uuid);
                }
                batchRequest.confirm();
                return;
            default:
                if (code != 0) {
                    this.log.warning("Internal DR response with unknown error code, consider delivery was successful: " + ((int) code) + "req=" + drInternalResponse.id() + ", node=" + uuid);
                }
                batchRequest.complete(drInternalResponse.error() == null ? CacheDrResultType.ACKNOWLEDGED : CacheDrResultType.IGNORED, drInternalResponse.error());
                return;
        }
    }

    private DrInternalRequest createInternalRequest(Map<Byte, EntryBuffer> map, Collection<Byte> collection, IgniteUuid igniteUuid) {
        ArrayList arrayList = new ArrayList(map.size());
        int i = 0;
        try {
            for (Map.Entry<Byte, EntryBuffer> entry : map.entrySet()) {
                EntryBuffer value = entry.getValue();
                value.flush();
                arrayList.add(new DrInternalRequestEntry(entry.getKey().byteValue(), value.entriesCnt(), value.getBytes(), value.sizeBytes()));
                i += value.entriesCnt();
            }
            return new DrInternalRequest(ID_GEN.incrementAndGet(), this.cctx.name(), collection, arrayList, i, igniteUuid);
        } finally {
            Iterator<EntryBuffer> it = map.values().iterator();
            while (it.hasNext()) {
                U.closeQuiet(it.next());
            }
        }
    }

    public void stopServingRequests() {
        this.stopping = true;
        this.busyLock.block();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Stop DR batch manager: cache=" + this.cctx.name());
        }
        this.ioMgr.removeMessageListener(this.topic);
        this.reqMap.forEach((l, batchRequest) -> {
            batchRequest.complete(CacheDrResultType.IGNORED, null);
        });
        this.reqMap.clear();
        this.reqIdsToResend.clear();
        this.sndNodes.values().forEach((v0) -> {
            v0.clearUnprocessedRequests();
        });
    }

    static {
        $assertionsDisabled = !CacheSenderHubManager.class.desiredAssertionStatus();
        ID_GEN = new AtomicLong();
        FAKE_FST_ID = IgniteUuid.randomUuid();
    }
}
