Sampling Large Datasets using Spark

Unit testing is proving to be a big essential in developing Big Data programs. There is an entire chapter (Chapter 8) devoted to this subject in my book Pro Apache Hadoop
To test Big Data programs effectively you need smaller datasets that are representative of the large dataset. In this chapter we will explore how to create such dataset using Apache Spark

The Airline Dataset

I will use the same dataset I used when developing code for my book - The Airline dataset which consists of flight arrival and departure details of commercial domestic flights in the United States from 1987 to 2008. This dataset holds approximately 120 million records in comma-separated value (CSV) format. The total disk size of this dataset in uncompressed format is 120 GB. Although it is a large dataset that is suitable for Big Data processing, it is not too large making it suitable to run programs on it locally using Hadoop or Spark. Thus it can be used for performing tests which enable you to do "back of the envelope" calculations to create benchmarks.
The full dataset can be downloaded from here. I also recommend exploring the links on the side bar. Especially the link related to results of the competition this dataset was used in. It contains some excellent and challenging ideas you might want to reproduce in the course of your BigData learning.

Problem Statement?

Our goal is to generate a sampled dataset. The fraction of data to be sampled will be provided as an input. I have used fraction size of 0.001 which is 0.01% of the original dataset which should translate to approximately 120000 records which is adequate for most testing of the applications we will develop on top of the airline data.
Silvrback blog image
I intend to translate my entire book as well other books on Spark to Java 8 using this dataset. This article is the first in that series
I downloaded the entire airline dataset in my windows folder C:/MyData/AirlineData
I have generated the output in the folder hdfs/airlinedata under the "SparkLearningUsingJava8" project in Github and you can directly use it from there.

Implementation

The source code is available in the Github project "SparkLearningUsingJava8"
The Java 8 based Spark implementation is in the class com.axiomine.spark.examples.airline.datagen.AirlineDataSampler
The Java 7 based Spark implementation is in the class com.axiomine.spark.examples.airline.datagen17.AirlineDataSampler
The diagram below shows the implementation
Silvrback blog image
If you develop the same program in MapReduce it will be several lines of code and several classes more. One thing I love about Spark compared to MapReduce is how it provides efficient abstractions to concisely express what you are trying to develop. However it does not take away any of the power you once had with MapReduce. This is a key difference as compared to using other high level abstractions under MapReduce such as Hive and Pig. They make it easier to work with MapReduce but every once in a while you hit a brick wall because it does not do what you want it to do efficiently.
In this respect I like Spark for the same reason I liked the Spring Framework. It provided a framework but did not dictate how you should write your business logic. It made your life as a developer easy but did not attempt to restrict you under the patronizing guise of following the "Best Practices".

Process Layout

It is very important to define order of method invocations when using Spark. The following presentation by Aaron Davidson is a great introduction to this complex and important topic.
The diagram below demonstrates where in the distributed system each method invocation on the spark context executes
Silvrback blog image
I am going to use MapReduce terminology (Mapper node vs Reducer node) to describe the process since readers will be familiar with it.
Notice that the following calls are invoked on the Mapper Node responsible for processing the portion of the file (Each file is compressed using bz2 compression scheme which is splittable. Hence one file can be processed by multiple Mapper nodes)
First read the file into a JavaRDD instance.
JavaRDD<String> rdd = sc.textFile(inputFile);
Next sample the records and generate key-value pairs
float fraction =  0.001f ;
rdd.sample(false, fraction)  
        .mapToPair(l->{
        String[] parts = StringUtils.splitPreserveAllTokens(l, ",");
        String yrMoDd =parts[0] + "," + parts[1] +","+ parts[2];
        return new Tuple2<String,String>(yrMoDd,l);
   });
The mapToPair method call generates tuples which have the following format
Key = {YEAR},{MONTH},{DATE}
Value = {Original Record}
Note the order of processing. We sample before we extract the key-value pairs. That way we only produce key-value pairs only on the sampled records.
How is sampling without replacement done? Lets say we need 0.01% records. Thus we need 1 out of every 1000 records. One way to get a 0.01% of the records is take 1 record for every 1000 records encountered. One of the assumptions here is that the records with the expected characteristics (Representative smaller sample, in our case) are uniformly distributed.
Then we have to repartition the records. The original dataset had each year's flight records in its own file. Thus we have to re-partition this output based on year. This is where Spark truly shines. Since the original dataset was already partitioned by year, Spark will not do a full scale shuffling, it will allocate a node to the each years data (ex. 1987 data will get its own node, 1988 data will also get its own node). The data will be shuffled to this node based on the year. But if the node already has the data for the year it will not be reshuffled. Thus if 1987 data was entirely processed in one node, there will be no shuffling needed.
Finally the data will need to be sorted by Date (YYYY-MM-DD) of the flight. Sorting is similar to Map-Reduce Sort-Shuffle phase. It partially occurs in the Mapper node and partially in the Reducer node. One of my earlier blog article on MapReduce internals describes this process in detail as does the last section of chapter 5 of my book.
Thus the following lines run partially on the Mapper node and Reduce node
.repartitionAndSortWithinPartitions(
   new CustomPartitioner(noOfPartitions),//Partition output as the original input
   new CustomComparator()) //Sort in ascending order by Year,Month and Day
The implementation of the CustomPartitioner is as follows
public static class CustomPartitioner extends Partitioner implements Serializable
{
    private static final long serialVersionUID = 1L;
    private int partitions;
    public CustomPartitioner(int noOfPartitioners){
        partitions=noOfPartitioners; 
    }

    @Override
    public int getPartition(Object key) {
        String[] sa = StringUtils.splitPreserveAllTokens(key.toString(), ',');
        int y = (Integer.parseInt(sa[0])-1987);
        return (y%partitions);
    }

    @Override
    public int numPartitions() {
        return partitions;
    }       
} 
The partitioner processes the key portion of the tuples and partitions each tuple based on the year for the flight record. This will ensure that that the output files will be partitioned such that the same years flight records are in the same output file.
The implementation of the CustomComparator is as follows
public static class CustomComparator implements Comparator,Serializable
{
    private static final long serialVersionUID = 1L;
    @Override
    public int compare(Object o1, Object o2) {
        String s1 = (String) o1;
        String s2 = (String) o2;
        String[] p1 = StringUtils.splitPreserveAllTokens(s1, ',');
        String[] p2 = StringUtils.splitPreserveAllTokens(s2, ',');
        Integer y1 = Integer.parseInt(p1[0]);
        Integer y2 = Integer.parseInt(p2[0]);
        int result = y1.compareTo(y2);
        if(result==0){
            Integer m1 = Integer.parseInt(p1[1]);
            Integer m2 = Integer.parseInt(p2[1]);
            result = m1.compareTo(m2);
        }           
        if(result==0){
            Integer d1 = Integer.parseInt(p1[2]);
            Integer d2 = Integer.parseInt(p2[2]);
            result = d1.compareTo(d2);
        }           
        return result;
    }       
}
The role of the CustomComparator is to ensure that records in the output file are sorted in the ascending order by Year,Month and Date. The CustomComparater performs the sorting after the CustomPartitioner has performed the partitioning.
*Partitioning happens on the Mapper node. Sorting occurs partially on the Mapper node on the subset of the partitioned data and partially on the reducer node on the data shuffled to the reducer node from the respective Mapper nodes after Partitioning and partial sorting.
*

Finally the following executes on the Reducer Node
.map(t->t._2) //Process just the value
.saveAsTextFile(outputPath); //Write to file
On windows I ran the program with the following parameters
set %HADOOP_HOME%=C:/github/spark12/hadoop-common-2.2.0
set %FULL_DATASET%=C:/MyData/airlinedata/data
set %SAMPLED_DATASET_FOLDER%=hdfs/input/airlinedata/
set %FRACTION%=0.001

java com.axiomine.spark.examples.airline.datagen.AirlineDataSampler 
     -Dhadoop.home.dir=%HADOOP_HOME% %FULL_DATASET% 
     %SAMPLED_DATASET_FOLDER%  %FRACTION%
Note: On windows you will need to set the value of HADOOP_HOME and pass it as a System property to hadoop.home.dir. This is the folder whose bin subfodler contains the winutils.exe. See my previous blog article for the details.

Conclusion

You can use the above program to create your sampled datasets. You will only need to re-write the CustomPartitioner and the CustomComparator classes. Also you will need to re-write how you generate the key-value pairs in the mapToPair method call. But once you do that you have a reliable and concise way to generate a sampled dataset which will be the first step writing good unit tests.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization