/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.raft.jraft.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metrics.sources.RaftMetricSource;
import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.raft.jraft.disruptor.DisruptorBuilder;
import org.apache.ignite.raft.jraft.disruptor.DisruptorEventType;
import org.apache.ignite.raft.jraft.disruptor.NodeIdAware;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.jetbrains.annotations.Nullable;

public class StripedDisruptor<T extends NodeIdAware> {
    private final NodeId FAKE_NODE_ID = new NodeId(null, null);
    private static final IgniteLogger LOG = Loggers.forClass(StripedDisruptor.class);
    private ConcurrentHashMap<NodeId, Integer> stripeMapper = new ConcurrentHashMap();
    private final AtomicInteger incrementalCounter = new AtomicInteger();
    private final Disruptor<T>[] disruptors;
    private final RingBuffer<T>[] queues;
    private final ArrayList<StripeEntryHandler> eventHandlers;
    private final ArrayList<StripeExceptionHandler> exceptionHandlers;
    private final int stripes;
    private final String name;
    private final RaftMetricSource.DisruptorMetrics metrics;
    private final boolean sharedStripe;

    public static <U extends NodeIdAware> StripedDisruptor<U> createSerialDisruptor(String nodeName, String poolName, BiFunction<String, IgniteLogger, ThreadFactory> threadFactorySupplier, int bufferSize, EventFactory<U> eventFactory, boolean useYieldStrategy, @Nullable RaftMetricSource.DisruptorMetrics metrics) {
        return new StripedDisruptor<U>(nodeName, poolName, threadFactorySupplier, bufferSize, eventFactory, 1, true, useYieldStrategy, metrics);
    }

    public StripedDisruptor(String nodeName, String poolName, BiFunction<String, IgniteLogger, ThreadFactory> threadFactorySupplier, int bufferSize, EventFactory<T> eventFactory, int stripes, boolean sharedStripe, boolean useYieldStrategy, @Nullable RaftMetricSource.DisruptorMetrics raftMetrics) {
        this.disruptors = new Disruptor[stripes];
        this.queues = new RingBuffer[stripes];
        this.eventHandlers = new ArrayList(stripes);
        this.exceptionHandlers = new ArrayList(stripes);
        this.stripes = stripes;
        this.name = IgniteThread.threadPrefix((String)nodeName, (String)poolName);
        this.sharedStripe = sharedStripe;
        this.metrics = raftMetrics;
        for (int i = 0; i < stripes; ++i) {
            String stripeName = IgniteStringFormatter.format((String)"{}_stripe_{}", (Object[])new Object[]{poolName, i});
            Disruptor disruptor = DisruptorBuilder.newInstance().setRingBufferSize(bufferSize).setEventFactory(eventFactory).setThreadFactory(threadFactorySupplier.apply(stripeName, LOG)).setProducerType(ProducerType.MULTI).setWaitStrategy((WaitStrategy)(useYieldStrategy ? new YieldingWaitStrategy() : new BlockingWaitStrategy())).build();
            this.eventHandlers.add(new StripeEntryHandler(i));
            this.exceptionHandlers.add(new StripeExceptionHandler(this.name));
            disruptor.handleEventsWith(new EventHandler[]{this.eventHandlers.get(i)});
            disruptor.setDefaultExceptionHandler((ExceptionHandler)this.exceptionHandlers.get(i));
            this.queues[i] = disruptor.start();
            this.disruptors[i] = disruptor;
        }
    }

    public void shutdown() {
        for (int i = 0; i < this.stripes; ++i) {
            if (this.disruptors[i] != null) {
                this.disruptors[i].shutdown();
            }
            this.queues[i] = null;
            this.disruptors[i] = null;
        }
        this.eventHandlers.clear();
        this.exceptionHandlers.clear();
    }

    public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler) {
        return this.subscribe(nodeId, handler, null);
    }

    public RingBuffer<T> subscribe(NodeId nodeId, EventHandler<T> handler, BiConsumer<T, Throwable> exceptionHandler) {
        assert (this.getStripe(nodeId) == -1) : "The double subscriber for the one replication group [nodeId=" + nodeId + "].";
        int stripeId = this.nextStripeToSubscribe();
        this.stripeMapper.put(nodeId, stripeId);
        this.queues[stripeId].publishEvent((event, sequence) -> {
            event.reset();
            event.evtType = DisruptorEventType.SUBSCRIBE;
            event.nodeId = nodeId;
            event.handler = handler;
        });
        if (exceptionHandler != null) {
            this.exceptionHandlers.get(stripeId).subscribe(nodeId, exceptionHandler);
        }
        return this.queues[stripeId];
    }

    public void unsubscribe(NodeId nodeId) {
        int stripeId = this.getStripe(nodeId);
        assert (stripeId != -1) : "The replication group has not subscribed yet [nodeId=" + nodeId + "].";
        this.stripeMapper.remove(nodeId);
        this.queues[stripeId].publishEvent((event, sequence) -> {
            event.reset();
            event.evtType = DisruptorEventType.SUBSCRIBE;
            event.nodeId = nodeId;
            event.handler = null;
        });
        this.exceptionHandlers.get(stripeId).unsubscribe(nodeId);
    }

    public int getStripe(NodeId nodeId) {
        return this.stripeMapper.getOrDefault(nodeId, -1);
    }

    private int nextStripeToSubscribe() {
        return Math.abs(this.incrementalCounter.getAndIncrement() % this.stripes);
    }

    public String toString() {
        return IgniteStringFormatter.format((String)"{} [name={}]", (Object[])new Object[]{StripedDisruptor.class.getSimpleName(), this.name});
    }

    private class StripeEntryHandler
    implements EventHandler<T> {
        private final Map<NodeId, EventHandler<T>> subscribers = new HashMap();
        private final Map<NodeId, T> eventCache = new HashMap();
        private final Map<NodeId, Integer> currentBatchSizes = new HashMap<NodeId, Integer>();
        private final int stripeId;

        StripeEntryHandler(int stripeId) {
            this.stripeId = stripeId;
        }

        public void onEvent(T event, long sequence, boolean endOfBatch) throws Exception {
            if (((NodeIdAware)event).evtType == DisruptorEventType.SUBSCRIBE) {
                if (((NodeIdAware)event).handler == null) {
                    this.subscribers.remove(((NodeIdAware)event).nodeId());
                } else {
                    this.subscribers.put(((NodeIdAware)event).nodeId(), ((NodeIdAware)event).handler);
                }
            } else {
                this.internalBatching(event, sequence);
            }
            if (endOfBatch) {
                for (Map.Entry grpEvent : this.eventCache.entrySet()) {
                    EventHandler grpHandler = this.subscribers.get(((NodeIdAware)grpEvent.getValue()).nodeId());
                    if (grpHandler == null) continue;
                    if (StripedDisruptor.this.metrics != null && StripedDisruptor.this.metrics.enabled()) {
                        StripedDisruptor.this.metrics.hitToStripe(this.stripeId);
                        StripedDisruptor.this.metrics.addBatchSize(this.currentBatchSizes.getOrDefault(grpEvent.getKey(), 0) + 1);
                    }
                    grpHandler.onEvent((Object)((NodeIdAware)grpEvent.getValue()), sequence, true);
                }
                this.currentBatchSizes.clear();
                this.eventCache.clear();
            }
        }

        private void internalBatching(T event, long sequence) throws Exception {
            EventHandler grpHandler;
            NodeId pushNodeId = StripedDisruptor.this.sharedStripe ? StripedDisruptor.this.FAKE_NODE_ID : ((NodeIdAware)event).nodeId();
            NodeIdAware prevEvent = (NodeIdAware)this.eventCache.put(pushNodeId, event);
            if (prevEvent != null && (grpHandler = this.subscribers.get(prevEvent.nodeId())) != null) {
                if (StripedDisruptor.this.metrics != null && StripedDisruptor.this.metrics.enabled()) {
                    StripedDisruptor.this.metrics.hitToStripe(this.stripeId);
                    this.currentBatchSizes.compute(pushNodeId, (nodeId, cnt) -> {
                        if (cnt == null) {
                            return 1;
                        }
                        return cnt + 1;
                    });
                }
                grpHandler.onEvent((Object)prevEvent, sequence, false);
            }
        }
    }

    private class StripeExceptionHandler
    implements ExceptionHandler<T> {
        private final String name;
        private final ConcurrentHashMap<NodeId, BiConsumer<T, Throwable>> subscribers;

        StripeExceptionHandler(String name) {
            this.name = name;
            this.subscribers = new ConcurrentHashMap();
        }

        void subscribe(NodeId nodeId, BiConsumer<T, Throwable> handler) {
            this.subscribers.put(nodeId, handler);
        }

        void unsubscribe(NodeId nodeId) {
            this.subscribers.remove(nodeId);
        }

        public void handleOnStartException(Throwable ex) {
            LOG.error("Fail to start disruptor [name={}]", ex, new Object[]{this.name});
        }

        public void handleOnShutdownException(Throwable ex) {
            LOG.error("Fail to shutdown disruptor [name={}]", ex, new Object[]{this.name});
        }

        public void handleEventException(Throwable ex, long sequence, T event) {
            NodeId nodeId = ((NodeIdAware)event).nodeId();
            BiConsumer handler = nodeId == null ? null : this.subscribers.get(nodeId);
            LOG.error("Handle disruptor event error [name={}, event={}, hasHandler={}]", ex, new Object[]{this.name, event, handler != null});
            if (handler != null) {
                handler.accept(event, ex);
            }
        }
    }
}

