package org.apache.ignite.raft.jraft.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.class */
public class StripedDisruptor<T extends NodeIdAware> {
    private static final IgniteLogger LOG;
    private final Disruptor<T>[] disruptors;
    private final RingBuffer<T>[] queues;
    private final ArrayList<StripedDisruptor<T>.StripeEntryHandler> eventHandlers;
    private final ArrayList<StripedDisruptor<T>.StripeExceptionHandler> exceptionHandlers;
    private final int stripes;
    private final String name;
    private final RaftMetricSource.DisruptorMetrics metrics;
    private final boolean sharedStripe;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final NodeId FAKE_NODE_ID = new NodeId(null, null);
    private ConcurrentHashMap<NodeId, Integer> stripeMapper = new ConcurrentHashMap<>();
    private final AtomicInteger incrementalCounter = new AtomicInteger();

    /* loaded from: input_file:org/apache/ignite/raft/jraft/disruptor/StripedDisruptor$StripeEntryHandler.class */
    private class StripeEntryHandler implements EventHandler<T> {
        private final Map<NodeId, EventHandler<T>> subscribers = new HashMap();
        private final Map<NodeId, T> eventCache = new HashMap();
        private final Map<NodeId, Integer> currentBatchSizes = new HashMap();
        private final int stripeId;

        StripeEntryHandler(int i) {
            this.stripeId = i;
        }

        public void onEvent(T t, long j, boolean z) throws Exception {
            if (t.evtType != DisruptorEventType.SUBSCRIBE) {
                internalBatching(t, j);
            } else if (t.handler == null) {
                this.subscribers.remove(t.nodeId());
            } else {
                this.subscribers.put(t.nodeId(), t.handler);
            }
            if (z) {
                for (Map.Entry<NodeId, T> entry : this.eventCache.entrySet()) {
                    EventHandler<T> eventHandler = this.subscribers.get(entry.getValue().nodeId());
                    if (eventHandler != null) {
                        if (StripedDisruptor.this.metrics != null && StripedDisruptor.this.metrics.enabled()) {
                            StripedDisruptor.this.metrics.hitToStripe(this.stripeId);
                            StripedDisruptor.this.metrics.addBatchSize(this.currentBatchSizes.getOrDefault(entry.getKey(), 0).intValue() + 1);
                        }
                        eventHandler.onEvent(entry.getValue(), j, true);
                    }
                }
                this.currentBatchSizes.clear();
                this.eventCache.clear();
            }
        }

        private void internalBatching(T t, long j) throws Exception {
            EventHandler<T> eventHandler;
            NodeId nodeId = StripedDisruptor.this.sharedStripe ? StripedDisruptor.this.FAKE_NODE_ID : t.nodeId();
            T put = this.eventCache.put(nodeId, t);
            if (put == null || (eventHandler = this.subscribers.get(put.nodeId())) == null) {
                return;
            }
            if (StripedDisruptor.this.metrics != null && StripedDisruptor.this.metrics.enabled()) {
                StripedDisruptor.this.metrics.hitToStripe(this.stripeId);
                this.currentBatchSizes.compute(nodeId, (nodeId2, num) -> {
                    if (num == null) {
                        return 1;
                    }
                    return Integer.valueOf(num.intValue() + 1);
                });
            }
            eventHandler.onEvent(put, j, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/raft/jraft/disruptor/StripedDisruptor$StripeExceptionHandler.class */
    public class StripeExceptionHandler implements ExceptionHandler<T> {
        private final String name;
        private final ConcurrentHashMap<NodeId, BiConsumer<T, Throwable>> subscribers = new ConcurrentHashMap<>();

        StripeExceptionHandler(String str) {
            this.name = str;
        }

        void subscribe(NodeId nodeId, BiConsumer<T, Throwable> biConsumer) {
            this.subscribers.put(nodeId, biConsumer);
        }

        void unsubscribe(NodeId nodeId) {
            this.subscribers.remove(nodeId);
        }

        public void handleOnStartException(Throwable th) {
            StripedDisruptor.LOG.error("Fail to start disruptor [name={}]", th, new Object[]{this.name});
        }

        public void handleOnShutdownException(Throwable th) {
            StripedDisruptor.LOG.error("Fail to shutdown disruptor [name={}]", th, new Object[]{this.name});
        }

        public void handleEventException(Throwable th, long j, T t) {
            NodeId nodeId = t.nodeId();
            BiConsumer<T, Throwable> biConsumer = nodeId == null ? null : this.subscribers.get(nodeId);
            IgniteLogger igniteLogger = StripedDisruptor.LOG;
            Object[] objArr = new Object[3];
            objArr[0] = this.name;
            objArr[1] = t;
            objArr[2] = Boolean.valueOf(biConsumer != null);
            igniteLogger.error("Handle disruptor event error [name={}, event={}, hasHandler={}]", th, objArr);
            if (biConsumer != null) {
                biConsumer.accept(t, th);
            }
        }
    }

    public static <U extends NodeIdAware> StripedDisruptor<U> createSerialDisruptor(String str, String str2, BiFunction<String, IgniteLogger, ThreadFactory> biFunction, int i, EventFactory<U> eventFactory, boolean z, @Nullable RaftMetricSource.DisruptorMetrics disruptorMetrics) {
        return new StripedDisruptor<>(str, str2, biFunction, i, eventFactory, 1, true, z, disruptorMetrics);
    }

    public StripedDisruptor(String str, String str2, BiFunction<String, IgniteLogger, ThreadFactory> biFunction, int i, EventFactory<T> eventFactory, int i2, boolean z, boolean z2, @Nullable RaftMetricSource.DisruptorMetrics disruptorMetrics) {
        this.disruptors = new Disruptor[i2];
        this.queues = new RingBuffer[i2];
        this.eventHandlers = new ArrayList<>(i2);
        this.exceptionHandlers = new ArrayList<>(i2);
        this.stripes = i2;
        this.name = NamedThreadFactory.threadPrefix(str, str2);
        this.sharedStripe = z;
        this.metrics = disruptorMetrics;
        for (int i3 = 0; i3 < i2; i3++) {
            Disruptor<T> build = DisruptorBuilder.newInstance().setRingBufferSize(i).setEventFactory(eventFactory).setThreadFactory(biFunction.apply(IgniteStringFormatter.format("{}_stripe_{}", new Object[]{str2, Integer.valueOf(i3)}), LOG)).setProducerType(ProducerType.MULTI).setWaitStrategy(z2 ? new YieldingWaitStrategy() : new BlockingWaitStrategy()).build();
            this.eventHandlers.add(new StripeEntryHandler(i3));
            this.exceptionHandlers.add(new StripeExceptionHandler(this.name));
            build.handleEventsWith(new EventHandler[]{this.eventHandlers.get(i3)});
            build.setDefaultExceptionHandler(this.exceptionHandlers.get(i3));
            this.queues[i3] = build.start();
            this.disruptors[i3] = build;
        }
    }

    public void shutdown() {
        for (int i = 0; i < this.stripes; i++) {
            if (this.disruptors[i] != null) {
                this.disruptors[i].shutdown();
            }
            this.queues[i] = null;
            this.disruptors[i] = null;
        }
        this.eventHandlers.clear();
        this.exceptionHandlers.clear();
    }

    public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> eventHandler) {
        return subscribe(nodeId, eventHandler, null);
    }

    public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> eventHandler, BiConsumer<T, Throwable> biConsumer) {
        if (!$assertionsDisabled && getStripe(nodeId) != -1) {
            throw new AssertionError("The double subscriber for the one replication group [nodeId=" + nodeId + "].");
        }
        int nextStripeToSubscribe = nextStripeToSubscribe();
        this.stripeMapper.put(nodeId, Integer.valueOf(nextStripeToSubscribe));
        this.queues[nextStripeToSubscribe].publishEvent((nodeIdAware, j) -> {
            nodeIdAware.reset();
            nodeIdAware.evtType = DisruptorEventType.SUBSCRIBE;
            nodeIdAware.nodeId = nodeId;
            nodeIdAware.handler = eventHandler;
        });
        if (biConsumer != null) {
            this.exceptionHandlers.get(nextStripeToSubscribe).subscribe(nodeId, biConsumer);
        }
        return this.queues[nextStripeToSubscribe];
    }

    public void unsubscribe(NodeId nodeId) {
        int stripe = getStripe(nodeId);
        if (!$assertionsDisabled && stripe == -1) {
            throw new AssertionError("The replication group has not subscribed yet [nodeId=" + nodeId + "].");
        }
        this.stripeMapper.remove(nodeId);
        this.queues[stripe].publishEvent((nodeIdAware, j) -> {
            nodeIdAware.reset();
            nodeIdAware.evtType = DisruptorEventType.SUBSCRIBE;
            nodeIdAware.nodeId = nodeId;
            nodeIdAware.handler = null;
        });
        this.exceptionHandlers.get(stripe).unsubscribe(nodeId);
    }

    public int getStripe(NodeId nodeId) {
        return this.stripeMapper.getOrDefault(nodeId, -1).intValue();
    }

    private int nextStripeToSubscribe() {
        return Math.abs(this.incrementalCounter.getAndIncrement() % this.stripes);
    }

    public String toString() {
        return IgniteStringFormatter.format("{} [name={}]", new Object[]{StripedDisruptor.class.getSimpleName(), this.name});
    }

    static {
        $assertionsDisabled = !StripedDisruptor.class.desiredAssertionStatus();
        LOG = Loggers.forClass(StripedDisruptor.class);
    }
}
