Spark operator: RDD key conversion operation (1) -partitionBy, mapValues, flatMapValues
PartitionBy
Def partitionBy (partitioner: Partitioner): RDD [(K, V)]The function creates a new ShuffleRDD based on the partitioner function, repartitioning the original RDD.
- (2, "B"), (3, "C"), (4, "D")), 2)
- Rdd1: org.apache.spark.rdd.RDD [(Int, String)] = ParallelCollectionRDD [23] at makeRDD at: 21
- Scala> rdd1.partitions.size
- Res20: Int = 2
- // View the elements of each partition in rdd1 scala> rdd1.mapPartitionsWithIndex {
- | (PartIdx, iter) => {
- | Var part_map = scala.collection.mutable.Map [String, List [(Int, String)]] ()
- | While (iter.hasNext) {
- | Var part_name = "part_" + partIdx;
- Var elem = iter.next ()
- | If (part_map.contains (part_name)) {
- Var elems = part_map (part_name)
- Elems :: = elem
- | Part_map (part_name) = elems
- |} Else {
- | Part_map (part_name) = List [(Int, String)] {elem}
- |}
- |}
- | Part_map.iterator
- |
- |}
- |} .collect
- (2, B), (1, A))), (part_1, List ((4, D)), (3, C))))
- // (2, B), (1, A) in part_0, (4, D), (3, C) in part_1 // using partitionBy re-partitioning scala> var rdd2 = rdd1.partitionBy (new org.apache .spark.HashPartitioner (2))
- Rdd2: org.apache.spark.rdd.RDD [(Int, String)] = ShuffledRDD [25] at partitionBy at: 23
- Scala> rdd2.partitions.size
- Res23: Int = 2
- // View the elements of each partition in rdd2 scala> rdd2.mapPartitionsWithIndex {
- | (PartIdx, iter) => {
- | Var part_map = scala.collection.mutable.Map [String, List [(Int, String)]] ()
- | While (iter.hasNext) {
- | Var part_name = "part_" + partIdx;
- Var elem = iter.next ()
- | If (part_map.contains (part_name)) {
- Var elems = part_map (part_name)
- Elems :: = elem
- | Part_map (part_name) = elems
- |} Else {
- | Part_map (part_name) = List [(Int, String)] {elem}
- |}
- |}
- | Part_map.iterator
- |}
- |} .collect
- (2, B))), (part_1, List ((3, C)), () () () () () () () () () () (1, A))))
- // (4, D), (2, B) in part_0, (3, C), (1, A) in part_1
MapValues
Def mapValues [U] (f: (V) => U): RDD [(K, U)]With the basic conversion operation of the map, but mapValues is for [K, V] in the V value of the map operation.
- (2, "B"), (3, "C"), (4, "D")), 2)
- Rdd1: org.apache.spark.rdd.RDD [(Int, String)] = ParallelCollectionRDD [27] at makeRDD at: 21
- Scala> rdd1.mapValues (x => x + "_"). Collect
- Resend: Array [(Int, String)] = Array ((1, A_), (2, B_), (3, C_), (4, D_))
FlatMapValues
Def flatMapValues [U] (f: (V) => TraversableOnce [U]): RDD [(K, U)]With flatMap in the basic conversion operation, except that flatMapValues is flatMap for V values in [K, V].
- Scala> rdd1.flatMapValues (x => x + "_"). Collect
- (2, B), (2, _), (3, C), (3, _), (2, 4, D), (4, _))
Commentaires
Enregistrer un commentaire