Spark operator: RDD basic conversion operation (5) -mapPartitions, mapPartitionsWithIndex

MapPartitions

Defritant arg [U] (f: (Iterator [T]) => Iterator [U], preservesPartitioning: Boolean = false) (implicit arg0: ClassTag [U]): RDD [U]
This function is similar to the map function, except that the parameters of the mapping function are changed from each element in the RDD to an iterator for each partition in the RDD. If you need to create additional objects frequently during the mapping process, using the mapPartitions is more efficient than the map.
For example, all the data in the RDD through the JDBC connection to the database, if you use the map function, you may want to create a connection for each element, so expensive, if you use mapPartitions, then only need to establish a connection for each partition The
The preservesPartitioning parameter indicates whether the parent RDD partitioner partition information is retained.
  1. Var rdd1 = sc.makeRDD (1 to 5,2)
  2. // rdd1 has two partitions scala> var rdd3 = rdd1. MapPartitions {x => {
  3. | Var result = List [Int] ()
  4. Var i = 0
  5. | While (x.hasNext) {
  6. I + = x.next ()
  7. |}
  8. | Result.::(i).iterator
  9. }}}
  10. Rdd3: org.apache.spark.rdd.RDD [Int] = MapPartitionsRDD [84] at mapPartitions at: 23
  11.  
  12. // rdd3 accumulates the value in each partition in rdd1 scala> rdd3.collect
  13. Res65: Array [Int] = Array (3, 12)
  14. Scala> rdd3.partitions.size
  15. Res66: Int = 2
  16.  

MapPartitionsWithIndex

(Implicit arg0: ClassTag [U]): RDD [U] () () () () () () () () () () () () () () () ()
Function with mapPartitions function, but provides two parameters, the first parameter for the partition index.
  1. Var rdd1 = sc.makeRDD (1 to 5,2)
  2. // rdd1 has two partitions var rdd2 = rdd1. MapPartitionsWithIndex {
  3. (X, iter) => {
  4. Var result = List [String] ()
  5. Var i = 0
  6. While (iter.hasNext) {
  7. I + = iter.next ()
  8. }
  9. Result.::(x + "|" + i) .iterator
  10. }
  11. }
  12. // rdd2 accumulates the number of each partition in rdd1 and adds the partition index in front of the cumulative result of each partition scala> rdd2.collect
  13. Res13: Array [String] = Array (0 | 3, 1 | 12)
  14.  
  15.  

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization