Spark operator: RDD key conversion operation (2) -combineByKey, foldByKey

CombineByKey

ConcatenByKey [C] (createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD [(K, C)]
ConcatenByKey [C] (createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD [(K, C)]
AblinkByKey [C] (createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = Null): RDD [(K, C)]
This function is used to convert RDD [K, V] to RDD [K, C], where the V and C types can be the same or different.
One of the parameters:
CreateCombiner : combiner function, used to convert the V type to C type, the input parameters for the RDD [K, V] in the V, the output is C
MergeValue : Merge value function, a C type and a V type value into a C type, the input parameters (C, V), the output is C
MergeCombiners : Merge combiner function, used to merge two C-type values ​​into a C type, input parameters (C, C), output to C
NumPartitions : the number of RDD partitions , the default number of existing partitions
Partitioner : partition function, the default is HashPartitioner
MapSideCombine : whether the need to combine at the Map side, similar to MapReduce combine, the default is true
Look at the following example:
  1. ("A", 1), ("A", 2), ("B", 1), ("B", 2), ("C", 1) ))
  2. Rdd1: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [64] at makeRDD at: 21
  3.  
  4. Scala> rdd1.combineByKey (
  5. (V: Int) => v + "_",
  6. (C: String, v: Int) => c + "@" + v,
  7. | (C1: String, c2: String) => c1 + "$" + c2
  8. |) .collect
  9. Res60: Array [(String, String)] = Array ((A, 2_ $ 1_), (B, 1_ $ 2_), (C, 1_))
Three of the mapping functions are:
CreateCombiner: (V) => C
(V: Int) => v + "_" // After each V value plus the character _, return to the C type (String)
MergeValue: (C, V) => C
(C: String, v: Int) => c + "@" + v // merge C and V types, add character in the middle, return C (String)
MergeCombiners: (C, C) => C
(C1: String, c2: String) => c1 + "$" + c2 // merge C and C, add $ in the middle, return C (String)
Other parameters are default values.
Finally, convert RDD [String, Int] to RDD [String, String].
See example:
  1. Rdd1.combineByKey (
  2. (V: Int) => List (v),
  3. (C: List [Int], v: Int) => v :: c,
  4. (C1: List [Int], c2: List [Int]) => c1 ::: c2
  5. ) .collect
  6. (B, List (2, 1)), (C, List (1))) () () () () ()
Finally convert RDD [String, Int] to RDD [String, List [Int]].

FoldByKey

Def foldByKey (zeroValue: V) (func: (V, V) => V): RDD [(K, V)]
Def foldByKey (zeroValue: V, numPartitions: Int) (func: (V, V) => V): RDD [(K, V)]
Def foldByKey (zeroValue: V, partitioner: Partitioner) (func: (V, V) => V): RDD [(K, V)]
This function is used for RDD [K, V] to wrap V by V, and the parameter zeroValue indicates that the zeroValue is applied to V according to the mapping function, initialize V, and then apply the mapping function to the initialized V.
See the example directly:
  1. ("A", 0), ("A", 2), ("B", 1), ("B", 2), ("C", 1) ))
  2. Scala> rdd1.foldByKey (0) (_ + _). Collect
  3. Res75: Array [(String, Int)] = Array ((A, 2), (B, 3), (C, 1))
  4. (0), ("A", 2), and then add the value of each of the keys in rdd1, note that zeroValue = 0, need to initialize V, the mapping function is + First apply zeroValue to each V to get: ("A", 0 + 0), ("A", 2 + 0), ie:
  5. (A, 0 + 2), that is, (A, 2), and then apply the mapping function to the initialized V,
  6.  
Look again
  1. Scala> rdd1.foldByKey (2) (_ + _). Collect
  2. Res76: Array [(String, Int)] = Array ((A, 6), (B, 7), (C, 3))
  3. // first apply zeroValue = 2 to each V to get: ("A", 0 + 2), ("A", 2 + 2), ie: "A", 2), ("A" 4), and then apply the mapping letter // to the initialized V, and finally get: (A, 2 + 4), ie: (A, 6)
  4.  
Look at multiplication:
  1. Scala> rdd1.foldByKey (0) (_ * _). Collect
  2. Res77: Array [(String, Int)] = Array ((A, 0), (B, 0), (C, 0))
  3. // first apply zeroValue = 0 to each V, note that the mapping function is multiplied by: ("A", 0 * 0), ("A", 2 * 0)
  4. (A, 0 * 0), that is, (A, 0, 0, 0, 0, 0, 0), and then apply the initialized V to the following: 0)
  5. // other K is the same, and finally got V = 0
  6.  
  7. Scala> rdd1.foldByKey (1) (_ * _). Collect
  8. Res78: Array [(String, Int)] = Array ((A, 0), (B, 2), (C, 1))
  9. // When the mapping function is multiplied, it is necessary to set zeroValue to 1 to get the result we want.
  10.  
  11.  
When using the foldByKey operator, pay special attention to the mapping function and the value of zeroValue.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization