Processing Big data with Apache Spark
Apache Spark is used for streaming over large data
sets including HDFS, HBase, Cassandra to perform ETL and advanced
analytics. Spark has rich APIs that support Java,Scala and Python
languages. At Bimarian we use Apache Spark extensively
to perform ETL on Big Data. We understood that Spark is in-memory
computing framework, and we used Spark to reduce unnecessary writes and
reads to the disk. We found very good performance in respect of complex
ETL operations in the Big Data applications as we are using Spark.
Working with Spark:
Begin use of Spark to fetch data into Spark RDD. RDDs (Resilient Distributed Datasets) are Immutable Resilient Distributed collection of records that can be stored in HDFS or HBase. The data in the RDD is transformed as per the business logic and converted into another RDD. Thus the final output of the Spark program is another RDD with transformed data as desired. The elements of RDD output can be saved as a Text file, Object file, etc….
Here is sample program of Spark:
Since Spark has API’s designed in Java language, let us look at sample of word count program using Java.
To try the below code, ensure the following readily available in your machine
package com.bimarian.spark.spark_proj;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class WordCountSpark {
public static void main(String[] args) {
//Creating Spark Context object, this is the base object in Spark
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“Spark Word_Count“).setMaster(“local“));
//Loading the data in to Spark RDD
JavaRDD<String> file = sc.textFile(args[0]);
// split each document into words
JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(” “));
}
});
// count the occurrence of each word
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
System.out.println(“Word Count Output: “+counts.collect());
counts.saveAsTextFile(“spark_wordcount.csv“);
sc.close();
}
}
After done with programming, generate the jar file and execute the jar by passing input file using below command
Command to run the above program:
spark-submit –class <Class_name> –master <master_IP> <jar_name> <input_file> 2
spark-submit –class com.bimarian.spark.spark_proj.WordCountSpark –master local sparkwordcount-0.0.1-SNAPSHOT.jar spark.txt 2
Output:
Sysout statement:
Word Count Output: [(are,3), (job,2), (is,2), (you,3), (hi,2), (old,1), (how,3), (doing,1), (what,2), (your,2)]
We can observe the same output as Hadoop part files in spark_wordcount.csv folder, since we have called counts.saveAsTextFile(“spark_wordcount.csv“) in the above code, the output will be written to spark_wordcount.csv folder.
Working with Spark:
Begin use of Spark to fetch data into Spark RDD. RDDs (Resilient Distributed Datasets) are Immutable Resilient Distributed collection of records that can be stored in HDFS or HBase. The data in the RDD is transformed as per the business logic and converted into another RDD. Thus the final output of the Spark program is another RDD with transformed data as desired. The elements of RDD output can be saved as a Text file, Object file, etc….
Here is sample program of Spark:
Since Spark has API’s designed in Java language, let us look at sample of word count program using Java.
To try the below code, ensure the following readily available in your machine
Download Spark 1.2 jars
Eclipse IDE
To run the code:
Cloudera CDH 5.3.*
No need of any additional configuration since Cloudera provides CDH with the full set up of Spark
We can run the Spark programs on our
development IDE’s also, just by changing the file paths. At development
stage I would suggest this approach only. Once the development is
totally completed , run the program with larger data then, run in
cluster mode.
WordCountSpark.java:package com.bimarian.spark.spark_proj;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
public class WordCountSpark {
public static void main(String[] args) {
//Creating Spark Context object, this is the base object in Spark
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName(“Spark Word_Count“).setMaster(“local“));
//Loading the data in to Spark RDD
JavaRDD<String> file = sc.textFile(args[0]);
// split each document into words
JavaRDD<String> words = file.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
return Arrays.asList(s.split(” “));
}
});
// count the occurrence of each word
JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
return a + b;
}
});
System.out.println(“Word Count Output: “+counts.collect());
counts.saveAsTextFile(“spark_wordcount.csv“);
sc.close();
}
}
After done with programming, generate the jar file and execute the jar by passing input file using below command
Command to run the above program:
spark-submit –class <Class_name> –master <master_IP> <jar_name> <input_file> 2
spark-submit –class com.bimarian.spark.spark_proj.WordCountSpark –master local sparkwordcount-0.0.1-SNAPSHOT.jar spark.txt 2
Output:
Sysout statement:
Word Count Output: [(are,3), (job,2), (is,2), (you,3), (hi,2), (old,1), (how,3), (doing,1), (what,2), (your,2)]
We can observe the same output as Hadoop part files in spark_wordcount.csv folder, since we have called counts.saveAsTextFile(“spark_wordcount.csv“) in the above code, the output will be written to spark_wordcount.csv folder.
very nice blog,keep sharing more posts.
RépondreSupprimer