Spark operator: RDD key conversion operation (2) -combineByKey, foldByKey
CombineByKey
ConcatenByKey [C] (createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD [(K, C)]ConcatenByKey [C] (createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD [(K, C)]
AblinkByKey [C] (createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = Null): RDD [(K, C)]
This function is used to convert RDD [K, V] to RDD [K, C], where the V and C types can be the same or different.
One of the parameters:
CreateCombiner : combiner function, used to convert the V type to C type, the input parameters for the RDD [K, V] in the V, the output is C
MergeValue : Merge value function, a C type and a V type value into a C type, the input parameters (C, V), the output is C
MergeCombiners : Merge combiner function, used to merge two C-type values into a C type, input parameters (C, C), output to C
NumPartitions : the number of RDD partitions , the default number of existing partitions
Partitioner : partition function, the default is HashPartitioner
MapSideCombine : whether the need to combine at the Map side, similar to MapReduce combine, the default is true
Look at the following example:
Three of the mapping functions are:
- ("A", 1), ("A", 2), ("B", 1), ("B", 2), ("C", 1) ))
- Rdd1: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [64] at makeRDD at: 21
- Scala> rdd1.combineByKey (
- (V: Int) => v + "_",
- (C: String, v: Int) => c + "@" + v,
- | (C1: String, c2: String) => c1 + "$" + c2
- |) .collect
- Res60: Array [(String, String)] = Array ((A, 2_ $ 1_), (B, 1_ $ 2_), (C, 1_))
CreateCombiner: (V) => C
(V: Int) => v + "_" // After each V value plus the character _, return to the C type (String)
MergeValue: (C, V) => C
(C: String, v: Int) => c + "@" + v // merge C and V types, add character in the middle, return C (String)
MergeCombiners: (C, C) => C
(C1: String, c2: String) => c1 + "$" + c2 // merge C and C, add $ in the middle, return C (String)
Other parameters are default values.
Finally, convert RDD [String, Int] to RDD [String, String].
See example:
Finally convert RDD [String, Int] to RDD [String, List [Int]].
- Rdd1.combineByKey (
- (V: Int) => List (v),
- (C: List [Int], v: Int) => v :: c,
- (C1: List [Int], c2: List [Int]) => c1 ::: c2
- ) .collect
- (B, List (2, 1)), (C, List (1))) () () () () ()
FoldByKey
Def foldByKey (zeroValue: V) (func: (V, V) => V): RDD [(K, V)]Def foldByKey (zeroValue: V, numPartitions: Int) (func: (V, V) => V): RDD [(K, V)]
Def foldByKey (zeroValue: V, partitioner: Partitioner) (func: (V, V) => V): RDD [(K, V)]
This function is used for RDD [K, V] to wrap V by V, and the parameter zeroValue indicates that the zeroValue is applied to V according to the mapping function, initialize V, and then apply the mapping function to the initialized V.
See the example directly:
Look again
- ("A", 0), ("A", 2), ("B", 1), ("B", 2), ("C", 1) ))
- Scala> rdd1.foldByKey (0) (_ + _). Collect
- Res75: Array [(String, Int)] = Array ((A, 2), (B, 3), (C, 1))
- (0), ("A", 2), and then add the value of each of the keys in rdd1, note that zeroValue = 0, need to initialize V, the mapping function is + First apply zeroValue to each V to get: ("A", 0 + 0), ("A", 2 + 0), ie:
- (A, 0 + 2), that is, (A, 2), and then apply the mapping function to the initialized V,
Look at multiplication:
- Scala> rdd1.foldByKey (2) (_ + _). Collect
- Res76: Array [(String, Int)] = Array ((A, 6), (B, 7), (C, 3))
- // first apply zeroValue = 2 to each V to get: ("A", 0 + 2), ("A", 2 + 2), ie: "A", 2), ("A" 4), and then apply the mapping letter // to the initialized V, and finally get: (A, 2 + 4), ie: (A, 6)
When using the foldByKey operator, pay special attention to the mapping function and the value of zeroValue.
- Scala> rdd1.foldByKey (0) (_ * _). Collect
- Res77: Array [(String, Int)] = Array ((A, 0), (B, 0), (C, 0))
- // first apply zeroValue = 0 to each V, note that the mapping function is multiplied by: ("A", 0 * 0), ("A", 2 * 0)
- (A, 0 * 0), that is, (A, 0, 0, 0, 0, 0, 0), and then apply the initialized V to the following: 0)
- // other K is the same, and finally got V = 0
- Scala> rdd1.foldByKey (1) (_ * _). Collect
- Res78: Array [(String, Int)] = Array ((A, 0), (B, 2), (C, 1))
- // When the mapping function is multiplied, it is necessary to set zeroValue to 1 to get the result we want.
Commentaires
Enregistrer un commentaire