package org.apache.ignite.internal.processors.hadoop;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.HadoopConfiguration;
import org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem;
import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;

/* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.class */
public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest {
    private static HadoopSharedMap m = HadoopSharedMap.map(HadoopTaskExecutionSelfTest.class);
    private static final AtomicInteger totalLineCnt = (AtomicInteger) m.put("totalLineCnt", new AtomicInteger());
    private static final AtomicInteger executedTasks = (AtomicInteger) m.put("executedTasks", new AtomicInteger());
    private static final AtomicInteger cancelledTasks = (AtomicInteger) m.put("cancelledTasks", new AtomicInteger());
    private static final Map<String, String> taskWorkDirs = (Map) m.put("taskWorkDirs", new ConcurrentHashMap());
    private static final AtomicInteger failMapperId = (AtomicInteger) m.put("failMapperId", new AtomicInteger());
    private static final AtomicInteger splitsCount = (AtomicInteger) m.put("splitsCount", new AtomicInteger());
    private static final String MAP_WRITE = "test.map.write";

    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest$CancellingTestMapper.class */
    private static class CancellingTestMapper extends Mapper<Object, Text, Text, IntWritable> {
        private int mapperId;

        private CancellingTestMapper() {
        }

        protected void setup(Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            this.mapperId = HadoopTaskExecutionSelfTest.executedTasks.incrementAndGet();
        }

        public void run(Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            try {
                super.run(context);
            } catch (HadoopTaskCancelledException e) {
                HadoopTaskExecutionSelfTest.cancelledTasks.incrementAndGet();
                throw e;
            }
        }

        protected void map(Object obj, Text text, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            if (this.mapperId == HadoopTaskExecutionSelfTest.failMapperId.get()) {
                throw new IOException();
            }
            Thread.sleep(1000L);
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map(obj, (Text) obj2, (Mapper<Object, Text, Text, IntWritable>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest$FailMapper.class */
    private static class FailMapper extends Mapper<Object, Text, Text, IntWritable> {
        private FailMapper() {
        }

        protected void map(Object obj, Text text, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            throw new IOException("Expected");
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map(obj, (Text) obj2, (Mapper<Object, Text, Text, IntWritable>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest$InFormat.class */
    private static class InFormat extends TextInputFormat {
        private InFormat() {
        }

        public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
            List<InputSplit> splits = super.getSplits(jobContext);
            HadoopTaskExecutionSelfTest.splitsCount.set(splits.size());
            X.println("___ split of input: " + HadoopTaskExecutionSelfTest.splitsCount.get(), new Object[0]);
            return splits;
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest$TestCombiner.class */
    private static class TestCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
        IntWritable sum = new IntWritable();

        private TestCombiner() {
        }

        protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            X.println("___ Combiner: ", new Object[0]);
        }

        protected void reduce(Text text, Iterable<IntWritable> iterable, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int i = 0;
            Iterator<IntWritable> it = iterable.iterator();
            while (it.hasNext()) {
                i += it.next().get();
            }
            this.sum.set(i);
            X.println("___ combo: " + i, new Object[0]);
            context.write(text, this.sum);
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<IntWritable>) iterable, (Reducer<Text, IntWritable, Text, IntWritable>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest$TestMapper.class */
    private static class TestMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable ONE = new IntWritable(1);
        public static final Text LINE_COUNT = new Text("lineCount");

        private TestMapper() {
        }

        protected void setup(Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            X.println("___ Mapper: " + context.getTaskAttemptID(), new Object[0]);
            TestCase.assertNull(HadoopTaskExecutionSelfTest.taskWorkDirs.put(FileSystem.getLocal(context.getConfiguration()).getWorkingDirectory().toString(), context.getTaskAttemptID().toString()));
        }

        protected void map(Object obj, Text text, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            if (context.getConfiguration().getBoolean(HadoopTaskExecutionSelfTest.MAP_WRITE, false)) {
                context.write(LINE_COUNT, ONE);
            } else {
                HadoopTaskExecutionSelfTest.totalLineCnt.incrementAndGet();
            }
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map(obj, (Text) obj2, (Mapper<Object, Text, Text, IntWritable>.Context) context);
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest$TestReducer.class */
    private static class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        IntWritable sum = new IntWritable();

        private TestReducer() {
        }

        protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            X.println("___ Reducer: " + context.getTaskAttemptID(), new Object[0]);
            TestCase.assertNull(HadoopTaskExecutionSelfTest.taskWorkDirs.put(FileSystem.getLocal(context.getConfiguration()).getWorkingDirectory().toString(), context.getTaskAttemptID().toString()));
        }

        protected void reduce(Text text, Iterable<IntWritable> iterable, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int i = 0;
            for (IntWritable intWritable : iterable) {
                i += intWritable.get();
                X.println("___ rdcr: " + intWritable.get(), new Object[0]);
            }
            this.sum.set(i);
            context.write(text, this.sum);
            X.println("___ RDCR SUM: " + i, new Object[0]);
            HadoopTaskExecutionSelfTest.totalLineCnt.addAndGet(i);
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<IntWritable>) iterable, (Reducer<Text, IntWritable, Text, IntWritable>.Context) context);
        }
    }

    @Override // org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest
    public FileSystemConfiguration igfsConfiguration() {
        FileSystemConfiguration igfsConfiguration = super.igfsConfiguration();
        igfsConfiguration.setFragmentizerEnabled(false);
        return igfsConfiguration;
    }

    @Override // org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest
    protected boolean igfsEnabled() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest
    public void beforeTestsStarted() throws Exception {
        super.beforeTestsStarted();
        startGrids(gridCount());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest
    public void afterTestsStopped() throws Exception {
        stopAllGrids();
        super.afterTestsStopped();
    }

    protected void beforeTest() throws Exception {
        grid(0).fileSystem(igfsName).format();
    }

    @Override // org.apache.ignite.internal.processors.hadoop.HadoopAbstractSelfTest
    public HadoopConfiguration hadoopConfiguration(String str) {
        HadoopConfiguration hadoopConfiguration = super.hadoopConfiguration(str);
        hadoopConfiguration.setMaxParallelTasks(5);
        return hadoopConfiguration;
    }

    public void testMapRun() throws Exception {
        prepareFile("/testFile", 10000);
        totalLineCnt.set(0);
        taskWorkDirs.clear();
        Configuration configuration = new Configuration();
        configuration.setStrings("fs.igfs.impl", new String[]{IgniteHadoopFileSystem.class.getName()});
        Job job = Job.getInstance(configuration);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(TestMapper.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.setInputPaths(job, new Path[]{new Path("igfs://:" + getTestGridName(0) + "@/")});
        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
        job.setJarByClass(getClass());
        grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), HadoopUtils.createJobInfo(job.getConfiguration())).get();
        assertEquals(10000, totalLineCnt.get());
        assertEquals(32, taskWorkDirs.size());
    }

    public void testMapCombineRun() throws Exception {
        prepareFile("/testFile", 10001);
        totalLineCnt.set(0);
        taskWorkDirs.clear();
        Configuration configuration = new Configuration();
        configuration.setStrings("fs.igfs.impl", new String[]{IgniteHadoopFileSystem.class.getName()});
        configuration.setBoolean(MAP_WRITE, true);
        Job job = Job.getInstance(configuration);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(TestMapper.class);
        job.setCombinerClass(TestCombiner.class);
        job.setReducerClass(TestReducer.class);
        job.setNumReduceTasks(2);
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.setInputPaths(job, new Path[]{new Path("igfs://:" + getTestGridName(0) + "@/")});
        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output"));
        job.setJarByClass(getClass());
        HadoopJobId hadoopJobId = new HadoopJobId(UUID.randomUUID(), 2);
        grid(0).hadoop().submit(hadoopJobId, HadoopUtils.createJobInfo(job.getConfiguration())).get();
        assertEquals(10001, totalLineCnt.get());
        assertEquals(34, taskWorkDirs.size());
        for (int i = 0; i < gridCount(); i++) {
            grid(i).hadoop().finishFuture(hadoopJobId).get();
        }
    }

    public void testMapperException() throws Exception {
        prepareFile("/testFile", 1000);
        Configuration configuration = new Configuration();
        configuration.setStrings("fs.igfs.impl", new String[]{IgniteHadoopFileSystem.class.getName()});
        Job job = Job.getInstance(configuration);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(FailMapper.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.setInputPaths(job, new Path[]{new Path("igfs://:" + getTestGridName(0) + "@/")});
        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
        job.setJarByClass(getClass());
        final IgniteInternalFuture submit = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 3), HadoopUtils.createJobInfo(job.getConfiguration()));
        GridTestUtils.assertThrows(this.log, new Callable<Object>() { // from class: org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                submit.get();
                return null;
            }
        }, IgniteCheckedException.class, (String) null);
    }

    private void prepareFile(String str, int i) throws Exception {
        IgfsOutputStream create = grid(0).fileSystem(igfsName).create(new IgfsPath(str), true);
        Throwable th = null;
        try {
            try {
                PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(create));
                for (int i2 = 0; i2 < i; i2++) {
                    printWriter.print("Hello, Hadoop map-reduce!\n");
                }
                printWriter.flush();
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    private Configuration prepareJobForCancelling() throws Exception {
        prepareFile("/testFile", 1500);
        executedTasks.set(0);
        cancelledTasks.set(0);
        failMapperId.set(0);
        splitsCount.set(0);
        Configuration configuration = new Configuration();
        setupFileSystems(configuration);
        Job job = Job.getInstance(configuration);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(CancellingTestMapper.class);
        job.setNumReduceTasks(0);
        job.setInputFormatClass(InFormat.class);
        FileInputFormat.setInputPaths(job, new Path[]{new Path("igfs://:" + getTestGridName(0) + "@/")});
        FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output/"));
        job.setJarByClass(getClass());
        return job.getConfiguration();
    }

    public void testTaskCancelling() throws Exception {
        Configuration prepareJobForCancelling = prepareJobForCancelling();
        final IgniteInternalFuture submit = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), HadoopUtils.createJobInfo(prepareJobForCancelling));
        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest.2
            public boolean apply() {
                return HadoopTaskExecutionSelfTest.splitsCount.get() > 0;
            }
        }, 20000L)) {
            U.dumpThreads(this.log);
            assertTrue(false);
        }
        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest.3
            public boolean apply() {
                return HadoopTaskExecutionSelfTest.executedTasks.get() == HadoopTaskExecutionSelfTest.splitsCount.get();
            }
        }, 20000L)) {
            U.dumpThreads(this.log);
            assertTrue(false);
        }
        failMapperId.set(1);
        GridTestUtils.assertThrows(this.log, new Callable<Object>() { // from class: org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest.4
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                submit.get();
                return null;
            }
        }, IgniteCheckedException.class, (String) null);
        assertEquals(executedTasks.get(), cancelledTasks.get() + 1);
    }

    public void testJobKill() throws Exception {
        Configuration prepareJobForCancelling = prepareJobForCancelling();
        Hadoop hadoop = grid(0).hadoop();
        HadoopJobId hadoopJobId = new HadoopJobId(UUID.randomUUID(), 1);
        assertFalse(hadoop.kill(hadoopJobId));
        final IgniteInternalFuture submit = hadoop.submit(hadoopJobId, HadoopUtils.createJobInfo(prepareJobForCancelling));
        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest.5
            public boolean apply() {
                return HadoopTaskExecutionSelfTest.splitsCount.get() > 0;
            }
        }, 20000L)) {
            U.dumpThreads(this.log);
            assertTrue(false);
        }
        if (!GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest.6
            public boolean apply() {
                X.println("___ executed tasks: " + HadoopTaskExecutionSelfTest.executedTasks.get(), new Object[0]);
                return HadoopTaskExecutionSelfTest.executedTasks.get() == HadoopTaskExecutionSelfTest.splitsCount.get();
            }
        }, 20000L)) {
            U.dumpThreads(this.log);
            fail();
        }
        assertTrue(hadoop.kill(hadoopJobId));
        GridTestUtils.assertThrows(this.log, new Callable<Object>() { // from class: org.apache.ignite.internal.processors.hadoop.HadoopTaskExecutionSelfTest.7
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                submit.get();
                return null;
            }
        }, IgniteCheckedException.class, (String) null);
        assertEquals(executedTasks.get(), cancelledTasks.get());
        assertFalse(hadoop.kill(hadoopJobId));
    }
}
