Spark Operator: RDD Action Action (4) -countByKey, foreach, foreachPartition, sortBy
CountByKey
Def countByKey (): Map [K, Long]CountByKey is used to count the number of each K in RDD [K, V].
- ("B", 2), ("B", 1), ("B", 2), ("B", 3) ))
- Rdd1: org.apache.spark.rdd.RDD [(String, Int)] = ParallelCollectionRDD [7] at makeRDD at: 21
- Scala> rdd1.countByKey
- Res5: scala.collection.Map [String, Long] = Map (A -> 2, B -> 3)
Foreach
Def foreach (f: (T) ⇒ Unit): UnitForeach is used to traverse RDD and apply function f to each element.
But be aware that if the implementation of foreach RDD, will only be effective in the Executor, but not the Driver side.
For example: rdd.foreach (println), only in the Executor stdout print out, Driver side is not visible.
I was in Spark1.4, do not know whether it is true
At this time, the use of accumulator shared variables and foreach combination, it is a good choice.
- Scala> var cnt = sc.accumulator (0)
- Cnt: org.apache.spark.Accumulator [Int] = 0
- Scala> var rdd1 = sc.makeRDD (1 to 10,2)
- Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [5] at makeRDD at: 21
- Scala> rdd1.foreach (x => cnt + = x)
- Scala> cnt.value
- Res51: Int = 55
- Scala> rdd1.collect.foreach (println)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
ForeachPartition
Def foreachPartition (f: (Iterator [T]) ⇒ Unit): UnitForeachPartition and foreach similar, but for each partition using f.
- Scala> var rdd1 = sc.makeRDD (1 to 10,2)
- Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [5] at makeRDD at: 21
- Scala> var allsize = sc.accumulator (0)
- Size :: org.apache.spark.Accumulator [Int] = 0
- Scala> var rdd1 = sc.makeRDD (1 to 10,2)
- Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [6] at makeRDD at: 21
- Scala> rdd1.foreachPartition {x => {
- | Allsize + = x.size
- }}}
- Scala> println (allsize.value)
- 10
SortBy
(Implicit ord: Ordering [K], ctag: ClassTag [K]): RDD [T (t) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) ]SortBy sorts the elements in the RDD according to the given sort k function.
- Scala> var rdd1 = sc.makeRDD (Seq (3,6,7,1,2,0), 2)
- Scala> rdd1.sortBy (x => x) .collect
- Res1: Array [Int] = Array (0, 1, 2, 3, 6, 7) // default ascending scala> rdd1.sortBy (x => x, false) .collect
- Res2: Array [Int] = Array (7, 6, 3, 2, 1, 0) // descending order // RDD [K, V] type scala> var rdd1 = sc.makeRDD (Array ((A), 2 ("A", 1), ("B", 6), ("B", 3), ("B", 7)))
- Scala> rdd1.sortBy (x => x) .collect
- Res3: Array [(String, Int)] = Array ((A, 1), (A, 2), (B, 3), (B, 6), (B, 7))
- // descending sort by V scala> rdd1.sortBy (x => x._2, false) .collect
- (B, 7), (B, 6), (B, 3), (A, 2), (A, 1))
Commentaires
Enregistrer un commentaire