Spark Operator: RDD Action Action (4) -countByKey, foreach, foreachPartition, sortBy

CountByKey

Def countByKey (): Map [K, Long]
CountByKey is used to count the number of each K in RDD [K, V].
  1. ("B", 2), ("B", 1), ("B", 2), ("B", 3) ))
  2. Rdd1: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [7] at makeRDD at: 21
  3.  
  4. Scala> rdd1.countByKey
  5. Res5: scala.collection.Map [String, Long] = Map (A -> 2, B -> 3)
  6.  

Foreach

Def foreach (f: (T) ⇒ Unit): Unit
Foreach is used to traverse RDD and apply function f to each element.
But be aware that if the implementation of foreach RDD, will only be effective in the Executor, but not the Driver side.
For example: rdd.foreach (println), only in the Executor stdout print out, Driver side is not visible.
I was in Spark1.4, do not know whether it is true
At this time, the use of accumulator shared variables and foreach combination, it is a good choice.
  1. Scala> var cnt = sc.accumulator (0)
  2. Cnt: org.apache.spark.Accumulator [Int] = 0
  3.  
  4. Scala> var rdd1 = sc.makeRDD (1 to 10,2)
  5. Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [5] at makeRDD at: 21
  6.  
  7. Scala> rdd1.foreach (x => cnt + = x)
  8.  
  9. Scala> cnt.value
  10. Res51: Int = 55
  11.  
  12. Scala> rdd1.collect.foreach (println)
  13. 1
  14. 2
  15. 3
  16. 4
  17. 5
  18. 6
  19. 7
  20. 8
  21. 9
  22. 10
  23.  

ForeachPartition

Def foreachPartition (f: (Iterator [T]) ⇒ Unit): Unit
ForeachPartition and foreach similar, but for each partition using f.
  1. Scala> var rdd1 = sc.makeRDD (1 to 10,2)
  2. Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [5] at makeRDD at: 21
  3.  
  4. Scala> var allsize = sc.accumulator (0)
  5. Size :: org.apache.spark.Accumulator [Int] = 0
  6.  
  7. Scala> var rdd1 = sc.makeRDD (1 to 10,2)
  8. Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [6] at makeRDD at: 21
  9.  
  10. Scala> rdd1.foreachPartition {x => {
  11. | Allsize + = x.size
  12. }}}
  13.  
  14. Scala> println (allsize.value)
  15. 10
  16.  

SortBy

(Implicit ord: Ordering [K], ctag: ClassTag [K]): RDD [T (t) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) ]
SortBy sorts the elements in the RDD according to the given sort k function.
  1. Scala> var rdd1 = sc.makeRDD (Seq (3,6,7,1,2,0), 2)
  2.  
  3. Scala> rdd1.sortBy (x => x) .collect
  4. Res1: Array [Int] = Array (0, 1, 2, 3, 6, 7) // default ascending scala> rdd1.sortBy (x => x, false) .collect
  5. Res2: Array [Int] = Array (7, 6, 3, 2, 1, 0) // descending order // RDD [K, V] type scala> var rdd1 = sc.makeRDD (Array ((A), 2 ("A", 1), ("B", 6), ("B", 3), ("B", 7)))
  6.  
  7. Scala> rdd1.sortBy (x => x) .collect
  8. Res3: Array [(String, Int)] = Array ((A, 1), (A, 2), (B, 3), (B, 6), (B, 7))
  9.  
  10. // descending sort by V scala> rdd1.sortBy (x => x._2, false) .collect
  11. (B, 7), (B, 6), (B, 3), (A, 2), (A, 1))
  12.  

Commentaires

Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch