Custom Partitioner in Spark

Welcome back to Sparking world.....!!!!

Today we will be looking at one  of the most interesting operation that we can perform in spark - Partitioning

If you have basic understanding of spark then you might be knowing that spark works on a concept called 'RDD'.

Each RDD is partitioned across cluster. One can choose number of partitioned allocated to newly created RDD. For example
val testFile = sc.textFile("/user/root/testfile.txt" ,3)
Second parameter of textFile function indicates number of partition to be allocated to RDD testFile.
If you want to verify then you can use below command in Scala.
testFile .partitions.size 

Spark supports two type of partitioner

1.Hash Partitioner - Uses Java Object.hashCodemethod to determine the partition 

2.Range Partitioner - Uses a range to distribute to the respective partitions the keys that fall within a range.

One of basic advantage is that as similar kind of data are co-located therefore shuffling of data across cluster reduces in transformations like groupByKey, reduceByKey etc. which increase job performance.

Lets create our own partitioner. Consider an example where I have file which contain text as well as number information and if i want to store numbers into separate partition and text into separate partition then above default partitioner will not work for me.

So we need to design our own partitioner to work with. Here is sanpshot of sample data that I am having.

Now need to transfer numeric and text into two different partition.Here is a code for the same.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner

object CustomPartitionerDemo {
 def main(args: Array[String]) {

  val sparkConf = new SparkConf().setAppName("CustomPartitionerDemo")
  val sc = new SparkContext(sparkConf)
  val inputFile = sc.textFile("/user/root/testfile")
  //create paired RDD 
  val pairedData = inputFile.flatMap ( x => x.split(" ") ).map(x => (x,1))
  //Define custom pertitioner for paired RDD
  val partitionedData=pairedData.partitionBy(new MyCustomerPartitioner(2))
  //verify result using mapPartitionWithIndex
  val finalOut = partitionedData.mapPartitionsWithIndex{(partitionIndex ,dataIterator) => => (dataInfo +" is located in  " + partitionIndex +" partition."))}
  //Save Output in HDFS


class MyCustomerPartitioner(numParts: Int) extends Partitioner {
 override def numPartitions: Int = numParts
 override def getPartition(key: Any): Int = 
     val out = toInt(key.toString)

 override def equals(other: Any): Boolean = other match 
 case dnp: MyCustomerPartitioner =>
 dnp.numPartitions == numPartitions
 case _ =>
 def toInt(s: String): Int = 
  try {
  } catch {
  case e: Exception => 1
 Snapshot of output :


Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch