Custom Partitioner in Spark
Welcome back to Sparking world.....!!!!
Today we will be looking at one of the most interesting operation that we can perform in spark - Partitioning
If you have basic understanding of spark then you might be knowing that spark works on a concept called 'RDD'.
Each RDD is partitioned across cluster. One can choose number of partitioned allocated to newly created RDD. For example
If you want to verify then you can use below command in Scala.
Spark supports two type of partitioner
2.Range Partitioner - Uses a range to distribute to the respective partitions the keys that fall within a range.
One of basic advantage is that as similar kind of data are co-located therefore shuffling of data across cluster reduces in transformations like groupByKey, reduceByKey etc. which increase job performance.
Lets create our own partitioner. Consider an example where I have file which contain text as well as number information and if i want to store numbers into separate partition and text into separate partition then above default partitioner will not work for me.
So we need to design our own partitioner to work with. Here is sanpshot of sample data that I am having.
Now need to transfer numeric and text into two different partition.Here is a code for the same.
Today we will be looking at one of the most interesting operation that we can perform in spark - Partitioning
If you have basic understanding of spark then you might be knowing that spark works on a concept called 'RDD'.
Each RDD is partitioned across cluster. One can choose number of partitioned allocated to newly created RDD. For example
val testFile = sc.textFile("/user/root/testfile.txt" ,3)Second parameter of textFile function indicates number of partition to be allocated to RDD testFile.
If you want to verify then you can use below command in Scala.
testFile .partitions.size
Spark supports two type of partitioner
1.Hash Partitioner - Uses Java Object.hashCodemethod to determine the partition
2.Range Partitioner - Uses a range to distribute to the respective partitions the keys that fall within a range.
One of basic advantage is that as similar kind of data are co-located therefore shuffling of data across cluster reduces in transformations like groupByKey, reduceByKey etc. which increase job performance.
Lets create our own partitioner. Consider an example where I have file which contain text as well as number information and if i want to store numbers into separate partition and text into separate partition then above default partitioner will not work for me.
So we need to design our own partitioner to work with. Here is sanpshot of sample data that I am having.
Now need to transfer numeric and text into two different partition.Here is a code for the same.
package com.td.SparkHbaseDemo.SparkCustomPartitioner import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.Partitioner object CustomPartitionerDemo { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("CustomPartitionerDemo") val sc = new SparkContext(sparkConf) val inputFile = sc.textFile("/user/root/testfile") //create paired RDD val pairedData = inputFile.flatMap ( x => x.split(" ") ).map(x => (x,1)) //Define custom pertitioner for paired RDD val partitionedData=pairedData.partitionBy(new MyCustomerPartitioner(2)) //verify result using mapPartitionWithIndex val finalOut = partitionedData.mapPartitionsWithIndex{(partitionIndex ,dataIterator) =>dataIterator.map(dataInfo => (dataInfo +" is located in " + partitionIndex +" partition."))} //Save Output in HDFS finalOut.saveAsTextFile("/user/root/partitionOutput") } } class MyCustomerPartitioner(numParts: Int) extends Partitioner { override def numPartitions: Int = numParts override def getPartition(key: Any): Int = { val out = toInt(key.toString) out } override def equals(other: Any): Boolean = other match { case dnp: MyCustomerPartitioner => dnp.numPartitions == numPartitions case _ => false } def toInt(s: String): Int = { try { s.toInt 0 } catch { case e: Exception => 1 } } }
Snapshot of output :
Commentaires
Enregistrer un commentaire