Processing Big data with Apache Spark

Apache Spark is used for streaming over large data sets including HDFS, HBase, Cassandra to perform ETL and advanced analytics. Spark has rich APIs that support Java,Scala and Python languages. At Bimarian we use  Apache Spark extensively to perform ETL on Big Data.  We understood that Spark is in-memory computing framework, and we used Spark to reduce unnecessary writes and reads to the disk. We found very good performance in respect of complex ETL operations in the Big Data applications as we are using Spark.
Working with Spark:
 Begin use of Spark to fetch data into Spark RDD. RDDs (Resilient Distributed Datasets) are Immutable Resilient Distributed collection of records that can be stored in HDFS or HBase. The data in the RDD is transformed as per the business logic and converted into another RDD. Thus the final output of the Spark program is another RDD with transformed data as desired. The elements of RDD output can be saved as a Text file, Object file, etc….
Here is sample  program of  Spark:
 Since Spark has API’s designed in Java language, let us look at sample of word count program using Java.
To try the below code,  ensure  the following readily available in your machine
Download Spark 1.2 jars
Eclipse IDE
To run the code:
Cloudera CDH 5.3.*
No need of any additional configuration since Cloudera provides CDH with the full set up of Spark
We can run the Spark programs on our development IDE’s also, just  by changing the file paths. At development stage I would suggest this approach only. Once the development is totally completed , run the program with larger data then, run in cluster mode.
WordCountSpark.java:
 package com.bimarian.spark.spark_proj;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class WordCountSpark {
public static void main(String[] args) {
//Creating Spark Context object, this is the base object in Spark
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“Spark                                                                    Word_Count“).setMaster(“local“));
//Loading the data in to Spark RDD
JavaRDD<String> file = sc.textFile(args[0]);
// split each document into words
JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(” “));
}
});
// count the occurrence of each word
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String,   String, Integer>() {
public Tuple2<String, Integer> call(String s) {
 return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer,   Integer, Integer>() {
public Integer call(Integer a, Integer b) {
 return a + b;
}
});
System.out.println(“Word Count Output: “+counts.collect());
counts.saveAsTextFile(“spark_wordcount.csv“);
sc.close();
}
}
After done with programming, generate the jar file and execute the jar by passing input file using below command
Command to run the above program:
 spark-submit –class <Class_name> –master <master_IP> <jar_name> <input_file> 2
spark-submit –class com.bimarian.spark.spark_proj.WordCountSpark –master local sparkwordcount-0.0.1-SNAPSHOT.jar spark.txt 2
Output:
Sysout statement:
Word Count Output: [(are,3), (job,2), (is,2), (you,3), (hi,2), (old,1), (how,3), (doing,1), (what,2), (your,2)]
We can observe the same output as Hadoop part files in spark_wordcount.csv folder, since we have called counts.saveAsTextFile(spark_wordcount.csv) in the above code, the output will be written to spark_wordcount.csv folder.

Commentaires

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization