package org.apache.ignite.internal.processors.hadoop.shuffle;

import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.hadoop.HadoopComponent;
import org.apache.ignite.internal.processors.hadoop.HadoopContext;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput;
import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput;
import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;

/* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.class */
public class HadoopShuffle extends HadoopComponent {
    private final ConcurrentMap<HadoopJobId, HadoopShuffleJob<UUID>> jobs = new ConcurrentHashMap();
    protected final GridUnsafeMemory mem = new GridUnsafeMemory(0);
    private final Object mux = new Object();
    static final /* synthetic */ boolean $assertionsDisabled;

    @Override // org.apache.ignite.internal.processors.hadoop.HadoopComponent
    public void start(HadoopContext hadoopContext) throws IgniteCheckedException {
        super.start(hadoopContext);
        hadoopContext.kernalContext().io().addMessageListener(GridTopic.TOPIC_HADOOP_MSG, new GridMessageListener() { // from class: org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle.1
            public void onMessage(UUID uuid, Object obj) {
                HadoopShuffle.this.onMessageReceived(uuid, (HadoopMessage) obj);
            }
        });
        hadoopContext.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP, new IgniteBiPredicate<UUID, Object>() { // from class: org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle.2
            public boolean apply(UUID uuid, Object obj) {
                return HadoopShuffle.this.onMessageReceived(uuid, (HadoopMessage) obj);
            }
        });
    }

    @Override // org.apache.ignite.internal.processors.hadoop.HadoopComponent
    public void stop(boolean z) {
        Iterator<HadoopShuffleJob<UUID>> it = this.jobs.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to close job.", e);
            }
        }
        this.jobs.clear();
    }

    private HadoopShuffleJob<UUID> newJob(HadoopJobId hadoopJobId) throws IgniteCheckedException {
        HadoopMapReducePlan plan = this.ctx.jobTracker().plan(hadoopJobId);
        HadoopShuffleJob<UUID> hadoopShuffleJob = new HadoopShuffleJob<>(this.ctx.localNodeId(), this.log, this.ctx.jobTracker().job(hadoopJobId, null), this.mem, plan.reducers(), plan.reducers(this.ctx.localNodeId()), localMappersCount(plan), true);
        UUID[] uuidArr = new UUID[plan.reducers()];
        for (int i = 0; i < uuidArr.length; i++) {
            UUID nodeForReducer = plan.nodeForReducer(i);
            if (!$assertionsDisabled && nodeForReducer == null) {
                throw new AssertionError("Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']');
            }
            uuidArr[i] = nodeForReducer;
        }
        boolean initializeReduceAddresses = hadoopShuffleJob.initializeReduceAddresses(uuidArr);
        if ($assertionsDisabled || initializeReduceAddresses) {
            return hadoopShuffleJob;
        }
        throw new AssertionError();
    }

    private int localMappersCount(HadoopMapReducePlan hadoopMapReducePlan) {
        Collection mappers = hadoopMapReducePlan.mappers(this.ctx.localNodeId());
        if (F.isEmpty(mappers)) {
            return 0;
        }
        return mappers.size();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void send0(UUID uuid, Object obj) throws IgniteCheckedException {
        ClusterNode node = this.ctx.kernalContext().discovery().node(uuid);
        if (obj instanceof Message) {
            this.ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message) obj, (byte) 0);
        } else {
            this.ctx.kernalContext().io().sendUserMessage(F.asList(node), obj, GridTopic.TOPIC_HADOOP, false, 0L, false);
        }
    }

    private HadoopShuffleJob<UUID> job(HadoopJobId hadoopJobId) throws IgniteCheckedException {
        HadoopShuffleJob<UUID> hadoopShuffleJob = this.jobs.get(hadoopJobId);
        if (hadoopShuffleJob == null) {
            synchronized (this.mux) {
                hadoopShuffleJob = this.jobs.get(hadoopJobId);
                if (hadoopShuffleJob == null) {
                    hadoopShuffleJob = newJob(hadoopJobId);
                    HadoopShuffleJob<UUID> putIfAbsent = this.jobs.putIfAbsent(hadoopJobId, hadoopShuffleJob);
                    if (putIfAbsent != null) {
                        hadoopShuffleJob.close();
                        hadoopShuffleJob = putIfAbsent;
                    } else if (hadoopShuffleJob.reducersInitialized()) {
                        startSending(hadoopShuffleJob);
                    }
                }
            }
        }
        return hadoopShuffleJob;
    }

    private void startSending(HadoopShuffleJob<UUID> hadoopShuffleJob) {
        hadoopShuffleJob.startSending(this.ctx.kernalContext().gridName(), new IgniteInClosure2X<UUID, HadoopMessage>() { // from class: org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffle.3
            public void applyx(UUID uuid, HadoopMessage hadoopMessage) throws IgniteCheckedException {
                HadoopShuffle.this.send0(uuid, hadoopMessage);
            }
        });
    }

    public boolean onMessageReceived(UUID uuid, HadoopMessage hadoopMessage) {
        try {
            if (hadoopMessage instanceof HadoopShuffleMessage) {
                HadoopShuffleMessage hadoopShuffleMessage = (HadoopShuffleMessage) hadoopMessage;
                job(hadoopShuffleMessage.jobId()).onShuffleMessage(uuid, hadoopShuffleMessage);
            } else if (hadoopMessage instanceof HadoopDirectShuffleMessage) {
                HadoopDirectShuffleMessage hadoopDirectShuffleMessage = (HadoopDirectShuffleMessage) hadoopMessage;
                job(hadoopDirectShuffleMessage.jobId()).onDirectShuffleMessage(uuid, hadoopDirectShuffleMessage);
            } else if (hadoopMessage instanceof HadoopShuffleAck) {
                HadoopShuffleAck hadoopShuffleAck = (HadoopShuffleAck) hadoopMessage;
                job(hadoopShuffleAck.jobId()).onShuffleAck(hadoopShuffleAck);
            } else if (hadoopMessage instanceof HadoopShuffleFinishRequest) {
                HadoopShuffleFinishRequest hadoopShuffleFinishRequest = (HadoopShuffleFinishRequest) hadoopMessage;
                job(hadoopShuffleFinishRequest.jobId()).onShuffleFinishRequest(uuid, hadoopShuffleFinishRequest);
            } else {
                if (!(hadoopMessage instanceof HadoopShuffleFinishResponse)) {
                    throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + uuid + ", msg=" + hadoopMessage + ']');
                }
                job(((HadoopShuffleFinishResponse) hadoopMessage).jobId()).onShuffleFinishResponse(uuid);
            }
            return true;
        } catch (IgniteCheckedException e) {
            U.error(this.log, "Message handling failed.", e);
            return true;
        }
    }

    public HadoopTaskOutput output(HadoopTaskContext hadoopTaskContext) throws IgniteCheckedException {
        return job(hadoopTaskContext.taskInfo().jobId()).output(hadoopTaskContext);
    }

    public HadoopTaskInput input(HadoopTaskContext hadoopTaskContext) throws IgniteCheckedException {
        return job(hadoopTaskContext.taskInfo().jobId()).input(hadoopTaskContext);
    }

    public void jobFinished(HadoopJobId hadoopJobId) {
        HadoopShuffleJob<UUID> remove = this.jobs.remove(hadoopJobId);
        if (remove != null) {
            try {
                remove.close();
            } catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to close job: " + hadoopJobId, e);
            }
        }
    }

    public IgniteInternalFuture<?> flush(HadoopJobId hadoopJobId) {
        HadoopShuffleJob<UUID> hadoopShuffleJob = this.jobs.get(hadoopJobId);
        if (hadoopShuffleJob == null) {
            return new GridFinishedFuture();
        }
        try {
            return hadoopShuffleJob.flush();
        } catch (IgniteCheckedException e) {
            return new GridFinishedFuture(e);
        }
    }

    public GridUnsafeMemory memory() {
        return this.mem;
    }

    static {
        $assertionsDisabled = !HadoopShuffle.class.desiredAssertionStatus();
    }
}
