Spark function to explain: aggregate

Let's take a look at the official document definition of the aggregate function:
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, We need one operation for merging a T into an U and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
The aggregate function aggregates the elements inside each partition, and then uses the combine function to combine the results of each partition with the initial value (zeroValue). The type of the final return of this function does not need to be consistent with the element type in RDD.

Function prototype

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

实例

/**
 * User: 过往记忆
 * Date: 15-02-12
 * Time: 上午08:30
 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 * 过往记忆博客微信公共帐号:iteblog_hadoop
 */
 
scala> def seqOP(a:Int, b:Int) : Int = {
     | println("seqOp: " + a + "\t" + b)
     | math.min(a,b)
     | }
seqOP: (a: Int, b: Int)Int
 
scala> def combOp(a:Int, b:Int): Int = {
     | println("combOp: " + a + "\t" + b)
     | a + b
     | }
combOp: (a: Int, b: Int)Int
 
scala> val z = sc. parallelize ( List (1 ,2 ,3 ,4 ,5 ,6) , 2)
scala> z. aggregate(3)(seqOP, combOp)
seqOp: 3    1
seqOp: 3    4
seqOp: 1    2
seqOp: 3    5
seqOp: 1    3
seqOp: 3    6
combOp: 3   1
combOp: 4   3
 
res20: Int = 7
 
scala> def seqOp(a:String, b:String) : String = {
     | println("seqOp: " + a + "\t" + b)
     | math.min(a.length , b.length ).toString
     | }
seqOp: (a: String, b: String)String
 
scala> def combOp(a:String, b:String) : String = {
     |  println("combOp: " + a + "\t" + b)
     | a + b
     | }
combOp: (a: String, b: String)String
 
scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"4567") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp:  345
seqOp:  12
seqOp: 0    4567
seqOp: 0    23
combOp:     1
combOp: 1   1
 
res25: String = 11
 
scala> val z = sc. parallelize ( List ("12" ,"23" ,"345" ,"") ,2)
scala> z. aggregate ("")(seqOp, combOp)
seqOp:  12
seqOp:  345
seqOp: 0    23
seqOp: 0   
combOp:     0
combOp: 0   1
res26: String = 01

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization