Sampling Large Datasets using Spark
Unit testing is proving to be a big essential in developing Big Data programs. There is an entire chapter (Chapter 8) devoted to this subject in my book Pro Apache Hadoop
To test Big Data programs effectively you need smaller datasets that are representative of the large dataset. In this chapter we will explore how to create such dataset using Apache Spark
The full dataset can be downloaded from here. I also recommend exploring the links on the side bar. Especially the link related to results of the competition this dataset was used in. It contains some excellent and challenging ideas you might want to reproduce in the course of your BigData learning.
I intend to translate my entire book as well other books on Spark to Java 8 using this dataset. This article is the first in that series
I downloaded the entire airline dataset in my windows folder
I have generated the output in the folder
The Java 8 based Spark implementation is in the class
The Java 7 based Spark implementation is in the class
The diagram below shows the implementation
If you develop the same program in MapReduce it will be several lines of code and several classes more. One thing I love about Spark compared to MapReduce is how it provides efficient abstractions to concisely express what you are trying to develop. However it does not take away any of the power you once had with MapReduce. This is a key difference as compared to using other high level abstractions under MapReduce such as Hive and Pig. They make it easier to work with MapReduce but every once in a while you hit a brick wall because it does not do what you want it to do efficiently.
In this respect I like Spark for the same reason I liked the Spring Framework. It provided a framework but did not dictate how you should write your business logic. It made your life as a developer easy but did not attempt to restrict you under the patronizing guise of following the "Best Practices".
The diagram below demonstrates where in the distributed system each method invocation on the spark context executes
I am going to use MapReduce terminology (Mapper node vs Reducer node) to describe the process since readers will be familiar with it.
Notice that the following calls are invoked on the Mapper Node responsible for processing the portion of the file (Each file is compressed using bz2 compression scheme which is splittable. Hence one file can be processed by multiple Mapper nodes)
First read the file into a JavaRDD instance.
Next sample the records and generate key-value pairs
The
Key = {YEAR},{MONTH},{DATE}
Value = {Original Record}
Note the order of processing. We sample before we extract the key-value pairs. That way we only produce key-value pairs only on the sampled records.
How is sampling without replacement done? Lets say we need 0.01% records. Thus we need 1 out of every 1000 records. One way to get a 0.01% of the records is take 1 record for every 1000 records encountered. One of the assumptions here is that the records with the expected characteristics (Representative smaller sample, in our case) are uniformly distributed.
Then we have to repartition the records. The original dataset had each year's flight records in its own file. Thus we have to re-partition this output based on year. This is where Spark truly shines. Since the original dataset was already partitioned by year, Spark will not do a full scale shuffling, it will allocate a node to the each years data (ex. 1987 data will get its own node, 1988 data will also get its own node). The data will be shuffled to this node based on the year. But if the node already has the data for the year it will not be reshuffled. Thus if 1987 data was entirely processed in one node, there will be no shuffling needed.
Finally the data will need to be sorted by Date (YYYY-MM-DD) of the flight. Sorting is similar to Map-Reduce Sort-Shuffle phase. It partially occurs in the Mapper node and partially in the Reducer node. One of my earlier blog article on MapReduce internals describes this process in detail as does the last section of chapter 5 of my book.
Thus the following lines run partially on the Mapper node and Reduce node
The implementation of the
The partitioner processes the key portion of the tuples and
partitions each tuple based on the year for the flight record. This will
ensure that that the output files will be partitioned such that the
same years flight records are in the same output file.
The implementation of the
The role of the CustomComparator is to ensure that records in the
output file are sorted in the ascending order by Year,Month and Date.
The CustomComparater performs the sorting after the CustomPartitioner
has performed the partitioning.
*Partitioning happens on the Mapper node. Sorting occurs partially on the Mapper node on the subset of the partitioned data and partially on the reducer node on the data shuffled to the reducer node from the respective Mapper nodes after Partitioning and partial sorting.
*
Finally the following executes on the Reducer Node
On windows I ran the program with the following parameters
Note: On windows you will need to set the value of HADOOP_HOME and pass it as a System property to
To test Big Data programs effectively you need smaller datasets that are representative of the large dataset. In this chapter we will explore how to create such dataset using Apache Spark
The Airline Dataset
I will use the same dataset I used when developing code for my book - The Airline dataset which consists of flight arrival and departure details of commercial domestic flights in the United States from 1987 to 2008. This dataset holds approximately 120 million records in comma-separated value (CSV) format. The total disk size of this dataset in uncompressed format is 120 GB. Although it is a large dataset that is suitable for Big Data processing, it is not too large making it suitable to run programs on it locally using Hadoop or Spark. Thus it can be used for performing tests which enable you to do "back of the envelope" calculations to create benchmarks.The full dataset can be downloaded from here. I also recommend exploring the links on the side bar. Especially the link related to results of the competition this dataset was used in. It contains some excellent and challenging ideas you might want to reproduce in the course of your BigData learning.
Problem Statement?
Our goal is to generate a sampled dataset. The fraction of data to be sampled will be provided as an input. I have used fraction size of0.001
which is 0.01%
of the original dataset which should translate to approximately 120000
records which is adequate for most testing of the applications we will develop on top of the airline data.I intend to translate my entire book as well other books on Spark to Java 8 using this dataset. This article is the first in that series
I downloaded the entire airline dataset in my windows folder
C:/MyData/AirlineData
I have generated the output in the folder
hdfs/airlinedata
under the "SparkLearningUsingJava8" project in Github and you can directly use it from there.Implementation
The source code is available in the Github project "SparkLearningUsingJava8"The Java 8 based Spark implementation is in the class
com.axiomine.spark.examples.airline.datagen.AirlineDataSampler
The Java 7 based Spark implementation is in the class
com.axiomine.spark.examples.airline.datagen17.AirlineDataSampler
The diagram below shows the implementation
If you develop the same program in MapReduce it will be several lines of code and several classes more. One thing I love about Spark compared to MapReduce is how it provides efficient abstractions to concisely express what you are trying to develop. However it does not take away any of the power you once had with MapReduce. This is a key difference as compared to using other high level abstractions under MapReduce such as Hive and Pig. They make it easier to work with MapReduce but every once in a while you hit a brick wall because it does not do what you want it to do efficiently.
In this respect I like Spark for the same reason I liked the Spring Framework. It provided a framework but did not dictate how you should write your business logic. It made your life as a developer easy but did not attempt to restrict you under the patronizing guise of following the "Best Practices".
Process Layout
It is very important to define order of method invocations when using Spark. The following presentation by Aaron Davidson is a great introduction to this complex and important topic.The diagram below demonstrates where in the distributed system each method invocation on the spark context executes
I am going to use MapReduce terminology (Mapper node vs Reducer node) to describe the process since readers will be familiar with it.
Notice that the following calls are invoked on the Mapper Node responsible for processing the portion of the file (Each file is compressed using bz2 compression scheme which is splittable. Hence one file can be processed by multiple Mapper nodes)
First read the file into a JavaRDD instance.
JavaRDD<String> rdd = sc.textFile(inputFile);
float fraction = 0.001f ;
rdd.sample(false, fraction)
.mapToPair(l->{
String[] parts = StringUtils.splitPreserveAllTokens(l, ",");
String yrMoDd =parts[0] + "," + parts[1] +","+ parts[2];
return new Tuple2<String,String>(yrMoDd,l);
});
mapToPair
method call generates tuples which have the following formatKey = {YEAR},{MONTH},{DATE}
Value = {Original Record}
Note the order of processing. We sample before we extract the key-value pairs. That way we only produce key-value pairs only on the sampled records.
How is sampling without replacement done? Lets say we need 0.01% records. Thus we need 1 out of every 1000 records. One way to get a 0.01% of the records is take 1 record for every 1000 records encountered. One of the assumptions here is that the records with the expected characteristics (Representative smaller sample, in our case) are uniformly distributed.
Then we have to repartition the records. The original dataset had each year's flight records in its own file. Thus we have to re-partition this output based on year. This is where Spark truly shines. Since the original dataset was already partitioned by year, Spark will not do a full scale shuffling, it will allocate a node to the each years data (ex. 1987 data will get its own node, 1988 data will also get its own node). The data will be shuffled to this node based on the year. But if the node already has the data for the year it will not be reshuffled. Thus if 1987 data was entirely processed in one node, there will be no shuffling needed.
Finally the data will need to be sorted by Date (YYYY-MM-DD) of the flight. Sorting is similar to Map-Reduce Sort-Shuffle phase. It partially occurs in the Mapper node and partially in the Reducer node. One of my earlier blog article on MapReduce internals describes this process in detail as does the last section of chapter 5 of my book.
Thus the following lines run partially on the Mapper node and Reduce node
.repartitionAndSortWithinPartitions(
new CustomPartitioner(noOfPartitions),//Partition output as the original input
new CustomComparator()) //Sort in ascending order by Year,Month and Day
CustomPartitioner
is as followspublic static class CustomPartitioner extends Partitioner implements Serializable
{
private static final long serialVersionUID = 1L;
private int partitions;
public CustomPartitioner(int noOfPartitioners){
partitions=noOfPartitioners;
}
@Override
public int getPartition(Object key) {
String[] sa = StringUtils.splitPreserveAllTokens(key.toString(), ',');
int y = (Integer.parseInt(sa[0])-1987);
return (y%partitions);
}
@Override
public int numPartitions() {
return partitions;
}
}
The implementation of the
CustomComparator
is as followspublic static class CustomComparator implements Comparator,Serializable
{
private static final long serialVersionUID = 1L;
@Override
public int compare(Object o1, Object o2) {
String s1 = (String) o1;
String s2 = (String) o2;
String[] p1 = StringUtils.splitPreserveAllTokens(s1, ',');
String[] p2 = StringUtils.splitPreserveAllTokens(s2, ',');
Integer y1 = Integer.parseInt(p1[0]);
Integer y2 = Integer.parseInt(p2[0]);
int result = y1.compareTo(y2);
if(result==0){
Integer m1 = Integer.parseInt(p1[1]);
Integer m2 = Integer.parseInt(p2[1]);
result = m1.compareTo(m2);
}
if(result==0){
Integer d1 = Integer.parseInt(p1[2]);
Integer d2 = Integer.parseInt(p2[2]);
result = d1.compareTo(d2);
}
return result;
}
}
*Partitioning happens on the Mapper node. Sorting occurs partially on the Mapper node on the subset of the partitioned data and partially on the reducer node on the data shuffled to the reducer node from the respective Mapper nodes after Partitioning and partial sorting.
*
Finally the following executes on the Reducer Node
.map(t->t._2) //Process just the value
.saveAsTextFile(outputPath); //Write to file
set %HADOOP_HOME%=C:/github/spark12/hadoop-common-2.2.0
set %FULL_DATASET%=C:/MyData/airlinedata/data
set %SAMPLED_DATASET_FOLDER%=hdfs/input/airlinedata/
set %FRACTION%=0.001
java com.axiomine.spark.examples.airline.datagen.AirlineDataSampler
-Dhadoop.home.dir=%HADOOP_HOME% %FULL_DATASET%
%SAMPLED_DATASET_FOLDER% %FRACTION%
hadoop.home.dir.
This is the folder whose bin
subfodler contains the winutils.exe
. See my previous blog article for the details.Conclusion
You can use the above program to create your sampled datasets. You will only need to re-write theCustomPartitioner
and the CustomComparator
classes. Also you will need to re-write how you generate the key-value pairs in the mapToPair
method call. But once you do that you have a reliable and concise way
to generate a sampled dataset which will be the first step writing good
unit tests.
Commentaires
Enregistrer un commentaire