Spark Operator: RDD Action Action (4) -countByKey, foreach, foreachPartition, sortBy

CountByKey

Def countByKey (): Map [K, Long]
CountByKey is used to count the number of each K in RDD [K, V].
  1. ("B", 2), ("B", 1), ("B", 2), ("B", 3) ))
  2. Rdd1: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [7] at makeRDD at: 21
  3.  
  4. Scala> rdd1.countByKey
  5. Res5: scala.collection.Map [String, Long] = Map (A -> 2, B -> 3)
  6.  

Foreach

Def foreach (f: (T) ⇒ Unit): Unit
Foreach is used to traverse RDD and apply function f to each element.
But be aware that if the implementation of foreach RDD, will only be effective in the Executor, but not the Driver side.
For example: rdd.foreach (println), only in the Executor stdout print out, Driver side is not visible.
I was in Spark1.4, do not know whether it is true
At this time, the use of accumulator shared variables and foreach combination, it is a good choice.
  1. Scala> var cnt = sc.accumulator (0)
  2. Cnt: org.apache.spark.Accumulator [Int] = 0
  3.  
  4. Scala> var rdd1 = sc.makeRDD (1 to 10,2)
  5. Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [5] at makeRDD at: 21
  6.  
  7. Scala> rdd1.foreach (x => cnt + = x)
  8.  
  9. Scala> cnt.value
  10. Res51: Int = 55
  11.  
  12. Scala> rdd1.collect.foreach (println)
  13. 1
  14. 2
  15. 3
  16. 4
  17. 5
  18. 6
  19. 7
  20. 8
  21. 9
  22. 10
  23.  

ForeachPartition

Def foreachPartition (f: (Iterator [T]) ⇒ Unit): Unit
ForeachPartition and foreach similar, but for each partition using f.
  1. Scala> var rdd1 = sc.makeRDD (1 to 10,2)
  2. Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [5] at makeRDD at: 21
  3.  
  4. Scala> var allsize = sc.accumulator (0)
  5. Size :: org.apache.spark.Accumulator [Int] = 0
  6.  
  7. Scala> var rdd1 = sc.makeRDD (1 to 10,2)
  8. Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [6] at makeRDD at: 21
  9.  
  10. Scala> rdd1.foreachPartition {x => {
  11. | Allsize + = x.size
  12. }}}
  13.  
  14. Scala> println (allsize.value)
  15. 10
  16.  

SortBy

(Implicit ord: Ordering [K], ctag: ClassTag [K]): RDD [T (t) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) ]
SortBy sorts the elements in the RDD according to the given sort k function.
  1. Scala> var rdd1 = sc.makeRDD (Seq (3,6,7,1,2,0), 2)
  2.  
  3. Scala> rdd1.sortBy (x => x) .collect
  4. Res1: Array [Int] = Array (0, 1, 2, 3, 6, 7) // default ascending scala> rdd1.sortBy (x => x, false) .collect
  5. Res2: Array [Int] = Array (7, 6, 3, 2, 1, 0) // descending order // RDD [K, V] type scala> var rdd1 = sc.makeRDD (Array ((A), 2 ("A", 1), ("B", 6), ("B", 3), ("B", 7)))
  6.  
  7. Scala> rdd1.sortBy (x => x) .collect
  8. Res3: Array [(String, Int)] = Array ((A, 1), (A, 2), (B, 3), (B, 6), (B, 7))
  9.  
  10. // descending sort by V scala> rdd1.sortBy (x => x._2, false) .collect
  11. (B, 7), (B, 6), (B, 3), (A, 2), (A, 1))
  12.  

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization