Controlling the number of Partitions in Spark for shuffle transformations (Ex. reduceByKey)
The previous article
explored how input partitions are defined by Spark. This short article
will describes how partitions are defined when Spark needs to Shuffle
data
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
All of the above method provide overloaded methods to customize the desired number of partitions. The question is, when we use the method versions which do not specify the number of partitions, how many partitions will Spark define by default?
By default the
1. First it will check the value of the Spark Configuration Parameter,
2. If
You can read more details from the source code for the Partitioner.scala file. The comments from lines 42 through 56 explain the above.
In the above example, the number of partitions produced for the
Of course, the actual execution only happens when an action method is invoked which in our case is
By Sameer Wadkar - Big Data Architect/Programmer, Author and Open Source Contributor
Transformations which require Data Shuffling
Some transformations will require data to be shuffled. Examples of such transformations in Spark are1.
groupByKey
2.
reduceByKey
3.
aggregateByKey
4.
sortByKey
5.
join
6.
cogroup
7.
cartesian
8.
coalesce
9.
repartition
10.
repartitionAndSortWithinPartitions
All of the above method provide overloaded methods to customize the desired number of partitions. The question is, when we use the method versions which do not specify the number of partitions, how many partitions will Spark define by default?
The Default Behavior
By default Spark utilizes theHashPartitioner
which extends the Partitioner
class. The following article demonstrates how you can use a Custom Partitioner. By default the
Partitioner
instance will define the number of partitions as follows1. First it will check the value of the Spark Configuration Parameter,
spark.default.parallelism
. If it is configured it will define the default partitions to be equal to its value2. If
spark.default.parallelism
is not set, Spark will
define the same number of partitions as existed in the largest upstream
RDD. This setting is least likely to cause out-of-memory errors.You can read more details from the source code for the Partitioner.scala file. The comments from lines 42 through 56 explain the above.
A Simple Example
Consider the simple WordCount program belowJavaSparkContext sc = new JavaSparkContext("local", "sparkwordcount");
JavaRDD<String> rdd = sc.textFile(inputPath);
JavaPairRDD<String, Integer> counts = rdd
.flatMap(x -> Arrays.asList(x.split(" ")))
.mapToPair(x -> new Tuple2<String, Integer>(x, 1
.reduceByKey((x, y) -> x + y);
counts.saveAsTextFile(outputPath);
sc.close();
reduceByKey
will be the same as the number of input partitions created for the sc.textFile(inputPath)
invocation. The previous article defines how the partitions are created for the upstream RDD. The upstream RDD is produced by the call sc.textFile(inputPath)
Of course, the actual execution only happens when an action method is invoked which in our case is
counts.saveAsTextFile(outputPath)
. But when Spark executes the job, it needs to first define the number of input partitions for the sc.textFile(inputPath)
. This decision making cannot be deferred until later. As we saw in the previous article , Spark has the information it needs to define the number of input partitions.Conclusion
This brief article explains how Spark defines the default number of Shuffle partitions for transformations that need Data Shuffling by key.By Sameer Wadkar - Big Data Architect/Programmer, Author and Open Source Contributor
Commentaires
Enregistrer un commentaire