package org.apache.ignite.internal.processors.hadoop;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.configuration.HadoopConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.util.typedef.internal.U;

/* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest.class */
public class HadoopJobTrackerSelfTest extends HadoopAbstractSelfTest {
    private static final String PATH_OUTPUT = "/test-out";
    private static final int BLOCK_CNT = 10;
    private static HadoopSharedMap m;
    private static final AtomicInteger mapExecCnt;
    private static final AtomicInteger reduceExecCnt;
    private static final AtomicInteger combineExecCnt;
    private static final Map<String, CountDownLatch> latch;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest$InFormat.class */
    public static class InFormat extends InputFormat {
        public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
            ArrayList arrayList = new ArrayList(HadoopJobTrackerSelfTest.BLOCK_CNT);
            for (int i = 0; i < HadoopJobTrackerSelfTest.BLOCK_CNT; i++) {
                try {
                    arrayList.add(new FileSplit(new Path(new URI("someFile")), i, i + 1, new String[]{"localhost"}));
                } catch (URISyntaxException e) {
                    throw new IOException(e);
                }
            }
            return arrayList;
        }

        public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            return new RecordReader() { // from class: org.apache.ignite.internal.processors.hadoop.HadoopJobTrackerSelfTest.InFormat.1
                public void initialize(InputSplit inputSplit2, TaskAttemptContext taskAttemptContext2) {
                }

                public boolean nextKeyValue() {
                    return false;
                }

                public Object getCurrentKey() {
                    return null;
                }

                public Object getCurrentValue() {
                    return null;
                }

                public float getProgress() {
                    return 0.0f;
                }

                public void close() {
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest$TestCombiner.class */
    private static class TestCombiner extends Reducer {
        private TestCombiner() {
        }

        public void run(Reducer.Context context) throws IOException, InterruptedException {
            System.out.println("Running task: " + context.getTaskAttemptID().getTaskID().getId());
            ((CountDownLatch) HadoopJobTrackerSelfTest.latch.get("combineAwaitLatch")).await();
            HadoopJobTrackerSelfTest.combineExecCnt.incrementAndGet();
            System.out.println("Completed task: " + context.getTaskAttemptID().getTaskID().getId());
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest$TestMapper.class */
    private static class TestMapper extends Mapper {
        private TestMapper() {
        }

        public void run(Mapper.Context context) throws IOException, InterruptedException {
            System.out.println("Running task: " + context.getTaskAttemptID().getTaskID().getId());
            ((CountDownLatch) HadoopJobTrackerSelfTest.latch.get("mapAwaitLatch")).await();
            HadoopJobTrackerSelfTest.mapExecCnt.incrementAndGet();
            System.out.println("Completed task: " + context.getTaskAttemptID().getTaskID().getId());
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopJobTrackerSelfTest$TestReducer.class */
    private static class TestReducer extends Reducer {
        private TestReducer() {
        }

        public void run(Reducer.Context context) throws IOException, InterruptedException {
            System.out.println("Running task: " + context.getTaskAttemptID().getTaskID().getId());
            ((CountDownLatch) HadoopJobTrackerSelfTest.latch.get("reduceAwaitLatch")).await();
            HadoopJobTrackerSelfTest.reduceExecCnt.incrementAndGet();
            System.out.println("Completed task: " + context.getTaskAttemptID().getTaskID().getId());
        }
    }

    @Override // org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest
    protected boolean igfsEnabled() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest
    public void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        startGrids(gridCount());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest
    public void afterTestsStopped() throws Exception {
        stopAllGrids();
        super.afterTestsStopped();
    }

    protected void beforeTest() throws Exception {
        latch.put("mapAwaitLatch", new CountDownLatch(1));
        latch.put("reduceAwaitLatch", new CountDownLatch(1));
        latch.put("combineAwaitLatch", new CountDownLatch(1));
    }

    protected void afterTest() throws Exception {
        mapExecCnt.set(0);
        combineExecCnt.set(0);
        reduceExecCnt.set(0);
    }

    @Override // org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest
    public HadoopConfiguration hadoopConfiguration(String str) {
        HadoopConfiguration hadoopConfiguration = super.hadoopConfiguration(str);
        hadoopConfiguration.setMapReducePlanner(new HadoopTestRoundRobinMrPlanner());
        return hadoopConfiguration;
    }

    public void testSimpleTaskSubmit() throws Exception {
        try {
            UUID randomUUID = UUID.randomUUID();
            Job job = Job.getInstance();
            setupFileSystems(job.getConfiguration());
            job.setMapperClass(TestMapper.class);
            job.setReducerClass(TestReducer.class);
            job.setInputFormatClass(InFormat.class);
            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "1"));
            HadoopJobId hadoopJobId = new HadoopJobId(randomUUID, 1);
            grid(0).hadoop().submit(hadoopJobId, HadoopUtils.createJobInfo(job.getConfiguration()));
            checkStatus(hadoopJobId, false);
            info("Releasing map latch.");
            latch.get("mapAwaitLatch").countDown();
            checkStatus(hadoopJobId, false);
            info("Releasing reduce latch.");
            latch.get("reduceAwaitLatch").countDown();
            checkStatus(hadoopJobId, true);
            assertEquals(BLOCK_CNT, mapExecCnt.get());
            assertEquals(0, combineExecCnt.get());
            assertEquals(1, reduceExecCnt.get());
            latch.get("mapAwaitLatch").countDown();
            latch.get("combineAwaitLatch").countDown();
            latch.get("reduceAwaitLatch").countDown();
        } catch (Throwable th) {
            latch.get("mapAwaitLatch").countDown();
            latch.get("combineAwaitLatch").countDown();
            latch.get("reduceAwaitLatch").countDown();
            throw th;
        }
    }

    public void testTaskWithCombinerPerMap() throws Exception {
        try {
            UUID randomUUID = UUID.randomUUID();
            Job job = Job.getInstance();
            setupFileSystems(job.getConfiguration());
            job.setMapperClass(TestMapper.class);
            job.setReducerClass(TestReducer.class);
            job.setCombinerClass(TestCombiner.class);
            job.setInputFormatClass(InFormat.class);
            FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + PATH_OUTPUT + "2"));
            HadoopJobId hadoopJobId = new HadoopJobId(randomUUID, 1);
            grid(0).hadoop().submit(hadoopJobId, HadoopUtils.createJobInfo(job.getConfiguration()));
            checkStatus(hadoopJobId, false);
            info("Releasing map latch.");
            latch.get("mapAwaitLatch").countDown();
            checkStatus(hadoopJobId, false);
            U.sleep(50L);
            assertEquals(0, reduceExecCnt.get());
            info("Releasing combiner latch.");
            latch.get("combineAwaitLatch").countDown();
            checkStatus(hadoopJobId, false);
            info("Releasing reduce latch.");
            latch.get("reduceAwaitLatch").countDown();
            checkStatus(hadoopJobId, true);
            assertEquals(BLOCK_CNT, mapExecCnt.get());
            assertEquals(BLOCK_CNT, combineExecCnt.get());
            assertEquals(1, reduceExecCnt.get());
            latch.get("mapAwaitLatch").countDown();
            latch.get("combineAwaitLatch").countDown();
            latch.get("reduceAwaitLatch").countDown();
        } catch (Throwable th) {
            latch.get("mapAwaitLatch").countDown();
            latch.get("combineAwaitLatch").countDown();
            latch.get("reduceAwaitLatch").countDown();
            throw th;
        }
    }

    private void checkStatus(HadoopJobId hadoopJobId, boolean z) throws Exception {
        for (int i = 0; i < gridCount(); i++) {
            IgniteKernal grid = grid(i);
            Hadoop hadoop = grid.hadoop();
            HadoopJobStatus status = hadoop.status(hadoopJobId);
            if (!$assertionsDisabled && status == null) {
                throw new AssertionError();
            }
            IgniteInternalFuture finishFuture = hadoop.finishFuture(hadoopJobId);
            if (z) {
                info("Waiting for status future completion on node [idx=" + i + ", nodeId=" + grid.getLocalNodeId() + ']');
                finishFuture.get();
            } else {
                assertFalse(finishFuture.isDone());
            }
        }
    }

    static {
        $assertionsDisabled = !HadoopJobTrackerSelfTest.class.desiredAssertionStatus();
        m = HadoopSharedMap.map(HadoopJobTrackerSelfTest.class);
        mapExecCnt = (AtomicInteger) m.put("mapExecCnt", new AtomicInteger());
        reduceExecCnt = (AtomicInteger) m.put("reduceExecCnt", new AtomicInteger());
        combineExecCnt = (AtomicInteger) m.put("combineExecCnt", new AtomicInteger());
        latch = (Map) m.put("latch", new HashMap());
    }
}
