Spark operator: RDD action Action (3) -aggregate, fold, lookup

Aggregate

(U, U) ⇒ U, combOp: (U, U) ⇒ U) (implicit arg0: ClassTag [U]): U
The aggregate user aggregates the elements in the RDD, first uses seqOp to aggregate the T-type elements in each partition of the RDD into a U-type, and then combOp aggregates the U-type after each partitioning into a U-type, with particular attention to seqOp and combop Will use the value of zeroValue, zeroValue type U.
  1. Var rdd1 = sc.makeRDD (1 to 10,2)
  2. Rdd1.mapPartitionsWithIndex {
  3. (PartIdx, iter) => {
  4. Var part_map = scala.collection.mutable.Map [String, List [Int]] ()
  5. While (iter.hasNext) {
  6. Var part_name = "part_" + partIdx;
  7. Var elem = iter.next ()
  8. If (part_map.contains (part_name)) {
  9. Var elems = part_map (part_name)
  10. Elems :: = elem
  11. Part_map (part_name) = elems
  12. } Else {
  13. Part_map (part_name) = List [Int] {elem}
  14. }
  15. }
  16. Part_map.iterator
  17. }
  18. } .collect
  19. Resz: Array [(String, List [Int])] = Array ((part_0, List (5, 4, 3, 2, 1)), (part_1, List (10, 9, 8, 7, 6)))
  20.  
## The first partition contains 5, 4, 3, 2, 1
## The second partition contains 10,9,8,7,6
  1. Scala> rdd1.aggregate (1) (
  2. | {(X: Int, y: Int) => x + y}
  3. | {(A: Int, b: Int) => a + b}
  4. |)
  5. Res17: Int = 58
  6.  
The result is 58, look at the following calculation process:
## First iterations in each partition (x: Int, y: Int) => x + y and use the value of zeroValue 1
## That is: part_0 in zeroValue + 5 + 4 + 3 + 2 + 1 = 1 + 5 + 4 + 3 + 2 + 1 = 16
## part_1 in zeroValue + 10 + 9 + 8 + 7 + 6 = 1 + 10 + 9 + 8 + 7 + 6 = 41
## Then combine the results of the two partitions (a: Int, b: Int) => a + b, and use the value of zeroValue 1
## that is: zeroValue + part_0 + part_1 = 1 + 16 + 41 = 58
Another example:
  1. Scala> rdd1.aggregate (2) (
  2. | {(X: Int, y: Int) => x + y}
  3. | {(A: Int, b: Int) => a * b}
  4. |)
  5. Res18: Int = 1428
  6.  
## This time zeroValue = 2
## part_0 in zeroValue + 5 + 4 + 3 + 2 + 1 = 2 + 5 + 4 + 3 + 2 + 1 = 17
## part_1 in zeroValue + 10 + 9 + 8 + 7 + 6 = 2 + 10 + 9 + 8 + 7 + 6 = 42
## last: zeroValue * part_0 * part_1 = 2 * 17 * 42 = 1428
Therefore , zeroValue that determines the type of U, will also have a crucial impact on the results , the use of special attention .

Fold

Def fold (zeroValue: T) (op: (T, T) ⇒ T): T
Fold is the simplification of aggregate, using the same function op in seqOp and combOp in aggregate.
  1. Scala> rdd1.fold (1) (
  2. | (X, y) => x + y
  3. |)
  4. Res19: Int = 58
  5.  
  6. ## The result is the same as the first example of using aggregate above:
  7. Scala> rdd1.aggregate (1) (
  8. | {(X, y) => x + y},
  9. | {(A, b) => a + b}
  10. |)
  11. Res20: Int = 58
  12.  

Lookup

Def lookup (key: K): Seq [V]
Lookup for RDD of type (K, V), specify K value, and return all V values ​​corresponding to K in RDD.
  1. ("A", 0), ("A", 2), ("B", 1), ("B", 2), ("C", 1) ))
  2. Rdd1: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [0] at makeRDD at: 21
  3.  
  4. Scala> rdd1.lookup ("A")
  5. Res0: Seq [Int] = WrappedArray (0, 2)
  6.  
  7. Scala> rdd1.lookup ("B")
  8. Res1: Seq [Int] = WrappedArray (1, 2)
  9.  
For more information on the Spark operator, refer to the Spark operator series .

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization