How to convert MapReduce Job to Oozie Job

This article takes the most famous "pi" program for example, explaining the steps to convert the MapReduce job to an Oozie job using MapReduce action nodes.

hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-0.20-mapreduce/hadoop-examples.jar pi 1 1

1. Find and read source code.

Firstly we need to find the correct java source code and make sure you understand the logic.
For this example, we can find the source code from Github here.
Per ExampleDriver.java, "pi" program is called from QuasiMonteCarlo.java.
The most important function is "estimatePi", and below is the most important snippet:
    job.setJarByClass(QuasiMonteCarlo.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputKeyClass(BooleanWritable.class);
    job.setOutputValueClass(LongWritable.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setMapperClass(QmcMapper.class);
    job.setReducerClass(QmcReducer.class);
    job.setNumReduceTasks(1);
    job.setSpeculativeExecution(false);

2. Learn from oozie-examples.tar.gz shipped with oozie binary.

For example, CDH5 has below:
/opt/cloudera/parcels/CDH/share/doc/oozie-4.0.0+cdh5.0.2+180/oozie-examples.tar.gz
Play with above examples, and learn from Oozie guide.

3. Prepare Configuration Files

3.1 Create a local directory named "map-reduce_pi" with 2 files -- job.properties and workflow.xml and 1 sub-directory -- lib.

[root@admin map-reduce_pi]# ls
job.properties  lib  workflow.xml

3.2 job.properties

job.properties normally specifies the namenode(nameservice if HA is enabled), jobtracker(or yarn resourcemanager), resource queue name, and some other parameters used by workflow.xml.
It should be on local filesystem.
nameNode=hdfs://nameservice1
jobTracker=hdw2.xxx.com:8032
queueName=default
examplesRoot=examples
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/map-reduce_pi
outputDir=map-reduce_pi

3.3 Put needed jar file in lib sub-directory.

[root@admin lib]# ls
hadoop-mapreduce-examples-2.3.0-cdh5.0.2.jar

3.3 Identify needed class full path.

From above source code, we identified the most important classes. Now, we need to identify the full path of them and make sure all needed jar files are included.
Map and Reduce classes are in hadoop-mapreduce-examples-2.3.0-cdh5.0.2.jar in "lib" sub-directory:
[root@admin lib]# jar tvf hadoop-mapreduce-examples-2.3.0-cdh5.0.2.jar|grep -i QuasiMonteCarlo|egrep -i "reduce|map"
  4510 Mon Jun 09 09:31:30 PDT 2014 org/apache/hadoop/examples/QuasiMonteCarlo$QmcReducer.class
  3027 Mon Jun 09 09:31:30 PDT 2014 org/apache/hadoop/examples/QuasiMonteCarlo$QmcMapper.class
So the full path of above 2 classes are:
org.apache.hadoop.examples.QuasiMonteCarlo$QmcMapper
org.apache.hadoop.examples.QuasiMonteCarlo$QmcReducer
Other full path of needed classes can be found from class import part of source code:
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.LongWritable;

3.4 workflow.xml

Here I am using new api, so some properties are different than old api.
To make things simpler next time, I created below mapping between java code and property name in workflow.xml.
JAVA functionsproperty in workflow.xml
job.setMapperClassmapreduce.map.class
job.setReducerClassmapreduce.reduce.class
job.setInputFormatClassmapreduce.job.inputformat.class
job.setOutputFormatClassmapreduce.job.outputformat.class
job.setOutputKeyClassmapreduce.job.output.key.class
job.setOutputValueClassmapreduce.job.output.value.class
FileInputFormat.setInputPathsmapreduce.input.fileinputformat.inputdir
FileOutputFormat.setOutputPathmapreduce.output.fileoutputformat.outputdir
This is sample workflow.xml:
<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wf-pi">
    <start to="mr-node"/>
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="/user/root/outputforpi"/>
            </prepare>
            <configuration>
              <property>
                  <name>mapred.mapper.new-api</name>
                  <value>true</value>
              </property>
              <property>
                  <name>mapred.reducer.new-api</name>
                  <value>true</value>
              </property>
              <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapreduce.map.class</name>
                    <value>org.apache.hadoop.examples.QuasiMonteCarlo$QmcMapper</value>
                </property>
                <property>
                    <name>mapreduce.reduce.class</name>
                    <value>org.apache.hadoop.examples.QuasiMonteCarlo$QmcReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>10</value>
                </property>
                <property>
                    <name>mapred.reduce.tasks</name>
                    <value>1</value>
                </property>
                <property>
                        <name>mapred.map.tasks.speculative.execution</name>
                        <value>false</value>
                </property>
                <property>
                        <name>mapred.reduce.tasks.speculative.execution</name>
                        <value>false</value>
                </property>
                <property>
                    <name>mapreduce.job.inputformat.class</name>
                    <value>org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat</value>
                </property>
                <property>
                    <name>mapreduce.job.outputformat.class</name>
                    <value>org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat</value>
                </property>
                <property>
                    <name>mapreduce.job.output.key.class</name>
                    <value>org.apache.hadoop.io.BooleanWritable</value>
                </property>
                <property>
                    <name>mapreduce.job.output.value.class</name>
                    <value>org.apache.hadoop.io.LongWritable</value>
                </property>
                <property>
                    <name>mapreduce.input.fileinputformat.inputdir</name>
                    <value>/user/root/inputforpi</value>
                </property>
                <property>
                    <name>mapreduce.output.fileoutputformat.outputdir</name>
                    <value>/user/root/outputforpi</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce pi failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

4. Deploy

Put the whole directory "map-reduce_pi" to HDFS.
hdfs dfs -rmr /user/root/examples/apps/map-reduce_pi
hdfs dfs -put /root/oozie/examples/apps/map-reduce_pi /user/root/examples/apps/map-reduce_pi

5. Feed input directory

The "pi" program generates the input data in the code, so one easy way is to run the "pi" MapReduce program, and copy the input directory when it is running.
[root@admin]# hdfs dfs -ls /user/root/inputforpi
Found 1 items
-rw-r--r--   3 root root        118 2014-07-14 11:53 /user/root/inputforpi/part0

6. Execute

oozie job -config /root/oozie/examples/apps/map-reduce_pi/job.properties -run
Monitor the status and make sure it SUCCEEDED.
oozie job -info <job ID>

7. Troubleshoot

Please refer to Troubleshoot Oozie MapReduce jobs.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization