Executing Oozie workflow of spark jobs in shell action

RS: http://blog.bimarian.com/executing-oozie-workflow-of-spark-jobs-in-shell-action/
Oozie is a server-based Workflow Engine and runs in a Java Servlet-Container to schedule and manage Hadoop jobs.
Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions. Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability.
Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs including Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop and Distcp as well as system specific jobs like Java programs and shell scripts).
Here we discuss a simple work flow which takes input from HDFS and performs word count using spark job. Here the job-1 passes its output to  job-2 as programmed in the shell script..
Create spark1.jar as below:
package com.bimarian.spark.spark_proj;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class WordCount1 {
          public static void main(String[] args) {
                   Long startTime = System.currentTimeMillis();
                   JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“Spark Count”).setMaster(“local”));
              JavaRDD<String> file = sc.textFile(args[0]);
              // split each document into words
              JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
                public Iterable<String> call(String s) {
                      return Arrays.asList(s.split(” “));
                }
              });
             // count the occurrence of each word
              JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) {
                      return new Tuple2<String, Integer>(s, 1);
                }
              });
               JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer a, Integer b) {
                      return a + b;
                }
              });
               Long endTime = System.currentTimeMillis();
              Long actualTime = endTime-startTime;
               System.out.println(“Time taken to done the job: “+(actualTime/1000));
               counts.saveAsTextFile(“sparkwordoutput1.csv”);
               sc.close();
            }
}
Create spark2.jar by changing counts.saveAsTextFile(“sparkwordoutput1.csv”)  to counts.saveAsTextFile(“sparkwordoutput2.csv”)  in  eclipse.
Here outputs are created with the names sparkwordoutput1.csv, sparkwordoutput2.csv under /user/yarn in HDFS.
Steps to define work flow:
environment : cloudera-quickstart-vm-5.3.0-vmware; hadoop version 2.5 , oozie version 4.0.0
1. create shell_example directory under your home directory(/home/cloudera)
2.. Under shell_example create two directories named apps and input-data. Under input-data directory place the input file ‘sample’
oozie-fig1
3.  Inside apps directory create a directory named shell
4. in the shell directory create three files: i) job.properties, ii) script.sh, iii) workflow.xml
 job.properties file content:
nameNode=hdfs://quickstart.cloudera:8020
jobTracker=quickstart.cloudera:8032
queueName=default
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.use.system.libpath=true
oozie.wf.rerun.failnodes=true
oozieProjectRoot=shell_example
oozie.wf.application.path=${nameNode}/user/${user.name}/${oozieProjectRoot}/apps/shell
script.sh file contents:
#!/bin/sh
spark-submit –class com.bimarian.spark.spark_proj.WordCount1 –master local /home/cloudera/shell_example/apps/shell/lib/spark1.jar /user/cloudera/shell_example/input-data 2
spark-submit –class com.bimarian.spark.spark_proj.WordCount1 –master local /home/cloudera/shell_example/apps/shell/lib/spark2.jar /user/yarn/sparkwordoutput1.csv 2
Note: Input path should point to HDFS
workflow.xml file content:
<workflow-app xmlns=”uri:oozie:workflow:0.1″ name=”shell-wf”>
    <start to=”shell-node”/>
    <action name=”shell-node”>
        <shell xmlns=”uri:oozie:shell-action:0.1″>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <exec>script.sh</exec>
            <file>/user/cloudera/oozie-oozi/script.sh#script.sh</file>
            <capture-output/>
        </shell>
        <ok to=”end”/>
        <error to=”fail”/>
        </action>
    <kill name=”fail”>
        <message>Incorrect output</message>
    </kill>
    <end name=”end”/>
</workflow-app>
5. create a lib directory under shell which should contain jar files- spark1.jar, spark2.jar
6. Place shell_example directory in HDFS under /user/cloudera in HDFS
  1. Place script.sh file in /user/cloudera/oozie-oozi directory ; otherwise job gets killed.
  2. Execute the Oozie workflow as   below. Note that the ‘job.properties’ file should be present in the local file system and not in HDFS.
  3. oozie job -oozie http://localhost:11000/oozie -config shell_example/apps/shell/job.properties -run
  4. Job id gets created
  5. Initially the job will be in the RUNNING state and finally will reach the SUCCEEDED state. The progress of the work flow can be monitored from Oozie console at http://localhost:11000/oozie/.
  6. Status of a job from command line with command :
oozie job -oozie http://localhost:11000/oozie -info <job-id>
oozie-fig2




13. The output appear as below in the /user/yarn/sparkwordoutput2.csv/part-00000 file in HDFS.
oozie-fig3

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization