package org.apache.ignite.internal.processors.hadoop.impl;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.HadoopConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsUserContext;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter;
import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount1;
import org.apache.ignite.internal.processors.hadoop.impl.examples.HadoopWordCount2;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/processors/hadoop/impl/HadoopAbstractMapReduceTest.class */
public class HadoopAbstractMapReduceTest extends HadoopAbstractWordCountTest {
    protected static final int IGFS_BLOCK_SIZE = 524288;
    protected static final int PREFETCH_BLOCKS = 1;
    protected static final int SEQ_READS_BEFORE_PREFETCH = 2;
    protected static final String SECONDARY_URI = "igfs://igfs-secondary@127.0.0.1:11500/";
    protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml";
    protected static final String USER = "vasya";
    protected static final String SECONDARY_IGFS_NAME = "igfs-secondary";
    protected static final int red = 10000;
    protected static final int blue = 20000;
    protected static final int green = 15000;
    protected static final int yellow = 7000;
    protected Ignite igniteSecondary;
    protected IgfsSecondaryFileSystem secondaryFs;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractWordCountTest, org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest
    public int gridCount() {
        return 3;
    }

    private static String getOwner(final IgfsEx igfsEx, final IgfsPath igfsPath) {
        return (String) IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { // from class: org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractMapReduceTest.1
            static final /* synthetic */ boolean $assertionsDisabled;

            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public String m4apply() {
                IgfsFile info = igfsEx.info(igfsPath);
                if ($assertionsDisabled || info != null) {
                    return info.property("usrName");
                }
                throw new AssertionError();
            }

            static {
                $assertionsDisabled = !HadoopAbstractMapReduceTest.class.desiredAssertionStatus();
            }
        });
    }

    private static String getOwnerSecondary(final IgfsSecondaryFileSystem igfsSecondaryFileSystem, final IgfsPath igfsPath) {
        return (String) IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { // from class: org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractMapReduceTest.2
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public String m5apply() {
                return igfsSecondaryFileSystem.info(igfsPath).property("usrName");
            }
        });
    }

    private void checkOwner(IgfsPath igfsPath) {
        assertEquals(USER, getOwner(this.igfs, igfsPath));
        assertEquals(USER, getOwnerSecondary(this.secondaryFs, igfsPath));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void doTest(IgfsPath igfsPath, boolean z, boolean z2, boolean z3) throws Exception {
        this.log.info("useNewMapper=" + z + ", useNewCombiner=" + z2 + ", useNewReducer=" + z3);
        this.igfs.delete(new IgfsPath("/output"), true);
        JobConf jobConf = new JobConf();
        jobConf.set("ignite.counters.writer", IgniteHadoopFileSystemCounterWriter.class.getName());
        jobConf.setUser(USER);
        jobConf.set("ignite.counters.fswriter.directory", "/xxx/${USER}/zzz");
        jobConf.setInt("mapreduce.input.fileinputformat.split.maxsize", 65000);
        jobConf.setInt("fs.local.block.size", 65000);
        setupFileSystems(jobConf);
        HadoopWordCount1.setTasksClasses(jobConf, !z, !z2, !z3);
        Job job = Job.getInstance(jobConf);
        HadoopWordCount2.setTasksClasses(job, z, z2, z3, compressOutputSnappy());
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setInputPaths(job, new Path[]{new Path(igfsScheme() + igfsPath.toString())});
        FileOutputFormat.setOutputPath(job, new Path(igfsScheme() + "/output"));
        job.setJarByClass(HadoopWordCount2.class);
        HadoopJobId hadoopJobId = new HadoopJobId(UUID.randomUUID(), PREFETCH_BLOCKS);
        grid(0).hadoop().submit(hadoopJobId, HadoopUtils.createJobInfo(job.getConfiguration(), (byte[]) null)).get();
        checkJobStatistics(hadoopJobId);
        String str = "/output/" + (z3 ? "part-r-" : "part-") + "00000";
        checkOwner(new IgfsPath("/output/_SUCCESS"));
        checkOwner(new IgfsPath(str));
        assertEquals("Use new mapper: " + z + ", new combiner: " + z2 + ", new reducer: " + z3, "blue\t20000\ngreen\t15000\nred\t10000\nyellow\t7000\n", readAndSortFile(str, job.getConfiguration()));
    }

    protected boolean compressOutputSnappy() {
        return false;
    }

    private void checkJobStatistics(HadoopJobId hadoopJobId) throws IgniteCheckedException, IOException {
        String str;
        String str2;
        HadoopPerformanceCounter counter = HadoopPerformanceCounter.getCounter(grid(0).hadoop().counters(hadoopJobId), (UUID) null);
        TreeMap treeMap = new TreeMap();
        HashMap hashMap = new HashMap();
        hashMap.put("submit", 0);
        hashMap.put("prepare", Integer.valueOf(PREFETCH_BLOCKS));
        hashMap.put("start", Integer.valueOf(SEQ_READS_BEFORE_PREFETCH));
        hashMap.put("Cstart", 3);
        hashMap.put("finish", 4);
        String str3 = null;
        long j = 0;
        for (T2 t2 : counter.evts()) {
            String[] split = ((String) t2.get1()).split(" ");
            if ("JOB".equals(split[0])) {
                str = split[0];
                str2 = split[PREFETCH_BLOCKS];
            } else {
                str = ("COMBINE".equals(split[0]) ? "MAP" : split[0].substring(0, 3)) + split[PREFETCH_BLOCKS];
                str2 = ("COMBINE".equals(split[0]) ? "C" : "") + split[SEQ_READS_BEFORE_PREFETCH];
            }
            if (!str.equals(str3)) {
                treeMap.put(str, new TreeMap());
            }
            Integer num = (Integer) hashMap.get(str2);
            assertNotNull("Invalid phase " + str2, num);
            ((SortedMap) treeMap.get(str)).put(num, t2.get2());
            str3 = str;
            j++;
        }
        for (Map.Entry entry : treeMap.entrySet()) {
            long j2 = 0;
            for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                assertTrue("Phase order of " + ((String) entry.getKey()) + " is invalid", ((Long) entry2.getValue()).longValue() >= j2);
                j2 = ((Long) entry2.getValue()).longValue();
            }
        }
        final IgfsPath igfsPath = new IgfsPath("/xxx/vasya/zzz/" + hadoopJobId + "/performance");
        if (!$assertionsDisabled && !GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractMapReduceTest.3
            public boolean apply() {
                return HadoopAbstractMapReduceTest.this.igfs.exists(igfsPath);
            }
        }, 20000L)) {
            throw new AssertionError();
        }
        final long j3 = j;
        if (GridTestUtils.waitForCondition(new GridAbsPredicate() { // from class: org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractMapReduceTest.4
            public boolean apply() {
                try {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(HadoopAbstractMapReduceTest.this.igfs.open(igfsPath)));
                    Throwable th = null;
                    try {
                        return j3 == HadoopTestUtils.simpleCheckJobStatFile(bufferedReader);
                    } finally {
                        if (bufferedReader != null) {
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                bufferedReader.close();
                            }
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }, 10000L)) {
            return;
        }
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.igfs.open(igfsPath)));
        if (!$assertionsDisabled) {
            throw new AssertionError("Invalid API events count [exp=" + j3 + ", actual=" + HadoopTestUtils.simpleCheckJobStatFile(bufferedReader) + ']');
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractWordCountTest, org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest
    public void beforeTest() throws Exception {
        this.igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, IgfsMode.PRIMARY, null, SECONDARY_REST_CFG);
        super.beforeTest();
    }

    protected Ignite startGridWithIgfs(String str, String str2, IgfsMode igfsMode, @Nullable IgfsSecondaryFileSystem igfsSecondaryFileSystem, @Nullable IgfsIpcEndpointConfiguration igfsIpcEndpointConfiguration) throws Exception {
        FileSystemConfiguration fileSystemConfiguration = new FileSystemConfiguration();
        fileSystemConfiguration.setName(str2);
        fileSystemConfiguration.setBlockSize(IGFS_BLOCK_SIZE);
        fileSystemConfiguration.setDefaultMode(igfsMode);
        fileSystemConfiguration.setIpcEndpointConfiguration(igfsIpcEndpointConfiguration);
        fileSystemConfiguration.setSecondaryFileSystem(igfsSecondaryFileSystem);
        fileSystemConfiguration.setPrefetchBlocks(PREFETCH_BLOCKS);
        fileSystemConfiguration.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH);
        CacheConfiguration defaultCacheConfiguration = defaultCacheConfiguration();
        defaultCacheConfiguration.setName("dataCache");
        defaultCacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
        defaultCacheConfiguration.setNearConfiguration((NearCacheConfiguration) null);
        defaultCacheConfiguration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        defaultCacheConfiguration.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(SEQ_READS_BEFORE_PREFETCH));
        defaultCacheConfiguration.setBackups(0);
        defaultCacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
        CacheConfiguration defaultCacheConfiguration2 = defaultCacheConfiguration();
        defaultCacheConfiguration2.setName("metaCache");
        defaultCacheConfiguration2.setCacheMode(CacheMode.REPLICATED);
        defaultCacheConfiguration2.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        defaultCacheConfiguration2.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
        fileSystemConfiguration.setDataCacheConfiguration(defaultCacheConfiguration);
        fileSystemConfiguration.setMetaCacheConfiguration(defaultCacheConfiguration2);
        IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
        igniteConfiguration.setIgniteInstanceName(str);
        TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
        tcpDiscoverySpi.setIpFinder(new TcpDiscoveryVmIpFinder(true));
        igniteConfiguration.setDiscoverySpi(tcpDiscoverySpi);
        igniteConfiguration.setFileSystemConfiguration(new FileSystemConfiguration[]{fileSystemConfiguration});
        igniteConfiguration.setLocalHost("127.0.0.1");
        igniteConfiguration.setConnectorConfiguration((ConnectorConfiguration) null);
        HadoopConfiguration createHadoopConfiguration = createHadoopConfiguration();
        if (createHadoopConfiguration != null) {
            igniteConfiguration.setHadoopConfiguration(createHadoopConfiguration);
        }
        return G.start(igniteConfiguration);
    }

    protected HadoopConfiguration createHadoopConfiguration() {
        return null;
    }

    @Override // org.apache.ignite.internal.processors.hadoop.impl.HadoopAbstractSelfTest
    public FileSystemConfiguration igfsConfiguration() throws Exception {
        FileSystemConfiguration igfsConfiguration = super.igfsConfiguration();
        this.secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG);
        igfsConfiguration.setSecondaryFileSystem(this.secondaryFs);
        return igfsConfiguration;
    }

    static {
        $assertionsDisabled = !HadoopAbstractMapReduceTest.class.desiredAssertionStatus();
    }
}
