package org.apache.ignite.internal.processors.hadoop.shuffle;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap;
import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.thread.IgniteThread;

/* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.class */
public class HadoopShuffleJob<T> implements AutoCloseable {
    private static final int MSG_BUF_SIZE = 131072;
    private final HadoopJob job;
    private final GridUnsafeMemory mem;
    private final boolean needPartitioner;
    private T[] reduceAddrs;
    private final T locReduceAddr;
    private final HadoopShuffleMessage[] msgs;
    private final AtomicReferenceArray<HadoopMultimap> maps;
    private volatile IgniteInClosure2X<T, HadoopShuffleMessage> io;
    private volatile GridWorker snd;
    private volatile boolean flushed;
    private final IgniteLogger log;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<Integer, HadoopTaskContext> reducersCtx = new HashMap();
    protected ConcurrentMap<Long, IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>> sentMsgs = new ConcurrentHashMap();
    private final CountDownLatch ioInitLatch = new CountDownLatch(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob$6, reason: invalid class name */
    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob$6.class */
    public static /* synthetic */ class AnonymousClass6 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$ignite$internal$processors$hadoop$HadoopTaskType = new int[HadoopTaskType.values().length];

        static {
            try {
                $SwitchMap$org$apache$ignite$internal$processors$hadoop$HadoopTaskType[HadoopTaskType.MAP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$ignite$internal$processors$hadoop$HadoopTaskType[HadoopTaskType.COMBINE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$ignite$internal$processors$hadoop$HadoopTaskType[HadoopTaskType.REDUCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob$PartitionedOutput.class */
    public class PartitionedOutput implements HadoopTaskOutput {
        private final HadoopTaskOutput[] adders;
        private HadoopPartitioner partitioner;
        private final HadoopTaskContext taskCtx;

        private PartitionedOutput(HadoopTaskContext hadoopTaskContext) throws IgniteCheckedException {
            this.adders = new HadoopTaskOutput[HadoopShuffleJob.this.maps.length()];
            this.taskCtx = hadoopTaskContext;
            if (HadoopShuffleJob.this.needPartitioner) {
                this.partitioner = hadoopTaskContext.partitioner();
            }
        }

        public void write(Object obj, Object obj2) throws IgniteCheckedException {
            int i = 0;
            if (this.partitioner != null) {
                i = this.partitioner.partition(obj, obj2, this.adders.length);
                if (i < 0 || i >= this.adders.length) {
                    throw new IgniteCheckedException("Invalid partition: " + i);
                }
            }
            HadoopTaskOutput hadoopTaskOutput = this.adders[i];
            if (hadoopTaskOutput == null) {
                HadoopMultimap.Adder startAdding = HadoopShuffleJob.this.getOrCreateMap(HadoopShuffleJob.this.maps, i).startAdding(this.taskCtx);
                hadoopTaskOutput = startAdding;
                this.adders[i] = startAdding;
            }
            hadoopTaskOutput.write(obj, obj2);
        }

        public void close() throws IgniteCheckedException {
            for (HadoopTaskOutput hadoopTaskOutput : this.adders) {
                if (hadoopTaskOutput != null) {
                    hadoopTaskOutput.close();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob$UnsafeValue.class */
    public static class UnsafeValue implements HadoopMultimap.Value {
        private final byte[] buf;
        private int off;
        private int size;
        static final /* synthetic */ boolean $assertionsDisabled;

        private UnsafeValue(byte[] bArr) {
            if (!$assertionsDisabled && bArr == null) {
                throw new AssertionError();
            }
            this.buf = bArr;
        }

        @Override // org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap.Value
        public int size() {
            return this.size;
        }

        @Override // org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap.Value
        public void copyTo(long j) {
            GridUnsafeMemory.UNSAFE.copyMemory(this.buf, GridUnsafeMemory.BYTE_ARR_OFF + this.off, (Object) null, j, this.size);
        }

        static {
            $assertionsDisabled = !HadoopShuffleJob.class.desiredAssertionStatus();
        }
    }

    public HadoopShuffleJob(T t, IgniteLogger igniteLogger, HadoopJob hadoopJob, GridUnsafeMemory gridUnsafeMemory, int i, int[] iArr) throws IgniteCheckedException {
        this.locReduceAddr = t;
        this.job = hadoopJob;
        this.mem = gridUnsafeMemory;
        this.log = igniteLogger.getLogger(HadoopShuffleJob.class);
        if (!F.isEmpty(iArr)) {
            for (int i2 : iArr) {
                this.reducersCtx.put(Integer.valueOf(i2), hadoopJob.getTaskContext(new HadoopTaskInfo(HadoopTaskType.REDUCE, hadoopJob.id(), i2, 0, (HadoopInputSplit) null)));
            }
        }
        this.needPartitioner = i > 1;
        this.maps = new AtomicReferenceArray<>(i);
        this.msgs = new HadoopShuffleMessage[i];
    }

    public boolean initializeReduceAddresses(T[] tArr) {
        if (this.reduceAddrs != null) {
            return false;
        }
        this.reduceAddrs = tArr;
        return true;
    }

    public boolean reducersInitialized() {
        return this.reduceAddrs != null;
    }

    public void startSending(String str, IgniteInClosure2X<T, HadoopShuffleMessage> igniteInClosure2X) {
        if (!$assertionsDisabled && this.snd != null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && igniteInClosure2X == null) {
            throw new AssertionError();
        }
        this.io = igniteInClosure2X;
        if (!this.flushed) {
            this.snd = new GridWorker(str, "hadoop-shuffle-" + this.job.id(), this.log) { // from class: org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob.1
                protected void body() throws InterruptedException {
                    while (!isCancelled()) {
                        try {
                            Thread.sleep(5L);
                            HadoopShuffleJob.this.collectUpdatesAndSend(false);
                        } catch (IgniteCheckedException e) {
                            throw new IllegalStateException((Throwable) e);
                        }
                    }
                }
            };
            new IgniteThread(this.snd).start();
        }
        this.ioInitLatch.countDown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public HadoopMultimap getOrCreateMap(AtomicReferenceArray<HadoopMultimap> atomicReferenceArray, int i) {
        HadoopMultimap hadoopMultimap = atomicReferenceArray.get(i);
        if (hadoopMultimap == null) {
            hadoopMultimap = HadoopJobProperty.get(this.job.info(), HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING, false) ? new HadoopConcurrentHashMultimap(this.job.info(), this.mem, HadoopJobProperty.get(this.job.info(), HadoopJobProperty.PARTITION_HASHMAP_SIZE, 8192)) : new HadoopSkipList(this.job.info(), this.mem);
            if (!atomicReferenceArray.compareAndSet(i, null, hadoopMultimap)) {
                hadoopMultimap.close();
                return atomicReferenceArray.get(i);
            }
        }
        return hadoopMultimap;
    }

    public void onShuffleMessage(HadoopShuffleMessage hadoopShuffleMessage) throws IgniteCheckedException {
        if (!$assertionsDisabled && hadoopShuffleMessage.buffer() == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && hadoopShuffleMessage.offset() <= 0) {
            throw new AssertionError();
        }
        HadoopTaskContext hadoopTaskContext = this.reducersCtx.get(Integer.valueOf(hadoopShuffleMessage.reducer()));
        HadoopPerformanceCounter.getCounter(hadoopTaskContext.counters(), null).onShuffleMessage(hadoopShuffleMessage.reducer(), U.currentTimeMillis());
        final HadoopMultimap.Adder startAdding = getOrCreateMap(this.maps, hadoopShuffleMessage.reducer()).startAdding(hadoopTaskContext);
        Throwable th = null;
        try {
            try {
                final GridUnsafeDataInput gridUnsafeDataInput = new GridUnsafeDataInput();
                final UnsafeValue unsafeValue = new UnsafeValue(hadoopShuffleMessage.buffer());
                hadoopShuffleMessage.visit(new HadoopShuffleMessage.Visitor() { // from class: org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob.2
                    private HadoopMultimap.Key key;

                    @Override // org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage.Visitor
                    public void onKey(byte[] bArr, int i, int i2) throws IgniteCheckedException {
                        gridUnsafeDataInput.bytes(bArr, i, i + i2);
                        this.key = startAdding.addKey(gridUnsafeDataInput, this.key);
                    }

                    @Override // org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage.Visitor
                    public void onValue(byte[] bArr, int i, int i2) {
                        unsafeValue.off = i;
                        unsafeValue.size = i2;
                        this.key.add(unsafeValue);
                    }
                });
                if (startAdding != null) {
                    if (0 == 0) {
                        startAdding.close();
                        return;
                    }
                    try {
                        startAdding.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (startAdding != null) {
                if (th != null) {
                    try {
                        startAdding.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    startAdding.close();
                }
            }
            throw th4;
        }
    }

    public void onShuffleAck(HadoopShuffleAck hadoopShuffleAck) {
        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> igniteBiTuple = this.sentMsgs.get(Long.valueOf(hadoopShuffleAck.id()));
        if (igniteBiTuple != null) {
            ((GridFutureAdapter) igniteBiTuple.get2()).onDone();
        } else {
            this.log.warning("Received shuffle ack for not registered shuffle id: " + hadoopShuffleAck);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void collectUpdatesAndSend(boolean z) throws IgniteCheckedException {
        for (int i = 0; i < this.maps.length(); i++) {
            HadoopMultimap hadoopMultimap = this.maps.get(i);
            if (hadoopMultimap != null && !this.locReduceAddr.equals(this.reduceAddrs[i])) {
                if (this.msgs[i] == null) {
                    this.msgs[i] = new HadoopShuffleMessage(this.job.id(), i, MSG_BUF_SIZE);
                }
                final int i2 = i;
                hadoopMultimap.visit(false, new HadoopMultimap.Visitor() { // from class: org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob.3
                    private long keyPtr;
                    private int keySize;
                    private boolean keyAdded;

                    @Override // org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap.Visitor
                    public void onKey(long j, int i3) {
                        this.keyPtr = j;
                        this.keySize = i3;
                        this.keyAdded = false;
                    }

                    private boolean tryAdd(long j, int i3) {
                        HadoopShuffleMessage hadoopShuffleMessage = HadoopShuffleJob.this.msgs[i2];
                        if (this.keyAdded) {
                            if (!hadoopShuffleMessage.available(i3, true)) {
                                return false;
                            }
                            hadoopShuffleMessage.addValue(j, i3);
                            return true;
                        }
                        if (!hadoopShuffleMessage.available(this.keySize + i3, false)) {
                            return false;
                        }
                        hadoopShuffleMessage.addKey(this.keyPtr, this.keySize);
                        hadoopShuffleMessage.addValue(j, i3);
                        this.keyAdded = true;
                        return true;
                    }

                    @Override // org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap.Visitor
                    public void onValue(long j, int i3) {
                        if (tryAdd(j, i3)) {
                            return;
                        }
                        HadoopShuffleJob.this.send(i2, this.keySize + i3);
                        this.keyAdded = false;
                        if (!tryAdd(j, i3)) {
                            throw new IllegalStateException();
                        }
                    }
                });
                if (z && this.msgs[i].offset() != 0) {
                    send(i, 0);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send(int i, int i2) {
        GridFutureAdapter gridFutureAdapter = new GridFutureAdapter();
        HadoopShuffleMessage hadoopShuffleMessage = this.msgs[i];
        final long id = hadoopShuffleMessage.id();
        IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>> putIfAbsent = this.sentMsgs.putIfAbsent(Long.valueOf(id), new IgniteBiTuple<>(hadoopShuffleMessage, gridFutureAdapter));
        if (!$assertionsDisabled && putIfAbsent != null) {
            throw new AssertionError();
        }
        try {
            this.io.apply(this.reduceAddrs[i], hadoopShuffleMessage);
        } catch (GridClosureException e) {
            gridFutureAdapter.onDone(U.unwrap(e));
        }
        gridFutureAdapter.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { // from class: org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob.4
            public void apply(IgniteInternalFuture<?> igniteInternalFuture) {
                try {
                    igniteInternalFuture.get();
                    HadoopShuffleJob.this.sentMsgs.remove(Long.valueOf(id));
                } catch (IgniteCheckedException e2) {
                    HadoopShuffleJob.this.log.error("Failed to send message.", e2);
                }
            }
        });
        this.msgs[i] = i2 == 0 ? null : new HadoopShuffleMessage(this.job.id(), i, Math.max(MSG_BUF_SIZE, i2));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IgniteCheckedException {
        if (this.snd != null) {
            this.snd.cancel();
            try {
                this.snd.join();
            } catch (InterruptedException e) {
                throw new IgniteInterruptedCheckedException(e);
            }
        }
        close(this.maps);
    }

    private void close(AtomicReferenceArray<HadoopMultimap> atomicReferenceArray) {
        for (int i = 0; i < atomicReferenceArray.length(); i++) {
            HadoopMultimap hadoopMultimap = atomicReferenceArray.get(i);
            if (hadoopMultimap != null) {
                hadoopMultimap.close();
            }
        }
    }

    public IgniteInternalFuture<?> flush() throws IgniteCheckedException {
        if (this.log.isDebugEnabled()) {
            this.log.debug("Flushing job " + this.job.id() + " on address " + this.locReduceAddr);
        }
        this.flushed = true;
        if (this.maps.length() == 0) {
            return new GridFinishedFuture();
        }
        U.await(this.ioInitLatch);
        GridWorker gridWorker = this.snd;
        if (gridWorker != null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Cancelling sender thread.");
            }
            gridWorker.cancel();
            try {
                gridWorker.join();
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + this.job.id());
                }
            } catch (InterruptedException e) {
                throw new IgniteInterruptedCheckedException(e);
            }
        }
        collectUpdatesAndSend(true);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Finished sending collected updates to remote reducers: " + this.job.id());
        }
        GridCompoundFuture gridCompoundFuture = new GridCompoundFuture();
        Iterator<IgniteBiTuple<HadoopShuffleMessage, GridFutureAdapter<?>>> it = this.sentMsgs.values().iterator();
        while (it.hasNext()) {
            gridCompoundFuture.add((IgniteInternalFuture) it.next().get2());
        }
        gridCompoundFuture.markInitialized();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Collected futures to compound futures for flush: " + this.sentMsgs.size());
        }
        return gridCompoundFuture;
    }

    public HadoopTaskOutput output(HadoopTaskContext hadoopTaskContext) throws IgniteCheckedException {
        switch (AnonymousClass6.$SwitchMap$org$apache$ignite$internal$processors$hadoop$HadoopTaskType[hadoopTaskContext.taskInfo().type().ordinal()]) {
            case 1:
                if (!$assertionsDisabled && this.job.info().hasCombiner()) {
                    throw new AssertionError("The output creation is allowed if combiner has not been defined.");
                }
                break;
            case 2:
                break;
            default:
                throw new IllegalStateException("Illegal type: " + hadoopTaskContext.taskInfo().type());
        }
        return new PartitionedOutput(hadoopTaskContext);
    }

    public HadoopTaskInput input(HadoopTaskContext hadoopTaskContext) throws IgniteCheckedException {
        switch (AnonymousClass6.$SwitchMap$org$apache$ignite$internal$processors$hadoop$HadoopTaskType[hadoopTaskContext.taskInfo().type().ordinal()]) {
            case 3:
                HadoopMultimap hadoopMultimap = this.maps.get(hadoopTaskContext.taskInfo().taskNumber());
                return hadoopMultimap != null ? hadoopMultimap.input(hadoopTaskContext) : new HadoopTaskInput() { // from class: org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob.5
                    public boolean next() {
                        return false;
                    }

                    public Object key() {
                        throw new IllegalStateException();
                    }

                    public Iterator<?> values() {
                        throw new IllegalStateException();
                    }

                    public void close() {
                    }
                };
            default:
                throw new IllegalStateException("Illegal type: " + hadoopTaskContext.taskInfo().type());
        }
    }

    static {
        $assertionsDisabled = !HadoopShuffleJob.class.desiredAssertionStatus();
    }
}
