Spark operator: RDD key conversion operation (1) -partitionBy, mapValues, flatMapValues

PartitionBy

Def partitionBy (partitioner: Partitioner): RDD [(K, V)]
The function creates a new ShuffleRDD based on the partitioner function, repartitioning the original RDD.
  1. (2, "B"), (3, "C"), (4, "D")), 2)
  2. Rdd1: org.apache.spark.rdd.RDD [(Int, String)] = ParallelCollectionRDD [23] at makeRDD at: 21
  3.  
  4. Scala> rdd1.partitions.size
  5. Res20: Int = 2
  6.  
  7. // View the elements of each partition in rdd1 scala> rdd1.mapPartitionsWithIndex {
  8. | (PartIdx, iter) => {
  9. | Var part_map = scala.collection.mutable.Map [String, List [(Int, String)]] ()
  10. | While (iter.hasNext) {
  11. | Var part_name = "part_" + partIdx;
  12. Var elem = iter.next ()
  13. | If (part_map.contains (part_name)) {
  14. Var elems = part_map (part_name)
  15. Elems :: = elem
  16. | Part_map (part_name) = elems
  17. |} Else {
  18. | Part_map (part_name) = List [(Int, String)] {elem}
  19. |}
  20. |}
  21. | Part_map.iterator
  22. |
  23. |}
  24. |} .collect
  25. (2, B), (1, A))), (part_1, List ((4, D)), (3, C))))
  26. // (2, B), (1, A) in part_0, (4, D), (3, C) in part_1 // using partitionBy re-partitioning scala> var rdd2 = rdd1.partitionBy (new org.apache .spark.HashPartitioner (2))
  27. Rdd2: org.apache.spark.rdd.RDD [(Int, String)] = ShuffledRDD [25] at partitionBy at: 23
  28.  
  29. Scala> rdd2.partitions.size
  30. Res23: Int = 2
  31.  
  32. // View the elements of each partition in rdd2 scala> rdd2.mapPartitionsWithIndex {
  33. | (PartIdx, iter) => {
  34. | Var part_map = scala.collection.mutable.Map [String, List [(Int, String)]] ()
  35. | While (iter.hasNext) {
  36. | Var part_name = "part_" + partIdx;
  37. Var elem = iter.next ()
  38. | If (part_map.contains (part_name)) {
  39. Var elems = part_map (part_name)
  40. Elems :: = elem
  41. | Part_map (part_name) = elems
  42. |} Else {
  43. | Part_map (part_name) = List [(Int, String)] {elem}
  44. |}
  45. |}
  46. | Part_map.iterator
  47. |}
  48. |} .collect
  49. (2, B))), (part_1, List ((3, C)), () () () () () () () () () () (1, A))))
  50. // (4, D), (2, B) in part_0, (3, C), (1, A) in part_1

MapValues

Def mapValues ​​[U] (f: (V) => U): RDD [(K, U)]
With the basic conversion operation of the map, but mapValues ​​is for [K, V] in the V value of the map operation.
  1. (2, "B"), (3, "C"), (4, "D")), 2)
  2. Rdd1: org.apache.spark.rdd.RDD [(Int, String)] = ParallelCollectionRDD [27] at makeRDD at: 21
  3.  
  4. Scala> rdd1.mapValues ​​(x => x + "_"). Collect
  5. Resend: Array [(Int, String)] = Array ((1, A_), (2, B_), (3, C_), (4, D_))
  6.  

FlatMapValues

Def flatMapValues ​​[U] (f: (V) => TraversableOnce [U]): RDD [(K, U)]
With flatMap in the basic conversion operation, except that flatMapValues ​​is flatMap for V values ​​in [K, V].
  1. Scala> rdd1.flatMapValues ​​(x => x + "_"). Collect
  2. (2, B), (2, _), (3, C), (3, _), (2, 4, D), (4, _))
  3.  

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization