Spark operator: RDD basic conversion operation (6) -zip, zipPartitions

Zip

Def zip [U] (other: RDD [U]) (implicit arg0: ClassTag [U]): RDD [(T, U)]
The zip function is used to combine two RDDs into the RDD of the Key / Value form. The default number of RDD partitions and the number of elements are the same, otherwise an exception is thrown.
  1. Scala> var rdd1 = sc.makeRDD (1 to 10,2)
  2. Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [0] at makeRDD at: 21
  3.  
  4. Scala> var rdd1 = sc.makeRDD (1 to 5,2)
  5. Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [1] at makeRDD at: 21
  6.  
  7. Scala> var rdd2 = sc.makeRDD (Seq ("A", "B", "C", "D", "E"), 2)
  8. Rdd2: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [2] at makeRDD at: 21
  9.  
  10. Scala> rdd1.zip (rdd2) .collect
  11. Res0: Array [(Int, String)] = Array ((1, A), (2, B), (3, C), (4, D), (5, E))
  12.  
  13. Scala> rdd2.zip (rdd1) .collect
  14. Resl: Array [(String, Int)] = Array ((A, 1), (B, 2), (C, 3), (D, 4), (E, 5))
  15.  
  16. Scala> var rdd3 = sc.makeRDD (Seq ("A", "B", "C", "D", "E"), 3)
  17. Rdd3: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [5] at makeRDD at: 21
  18.  
  19. Scala> rdd1.zip (rdd3) .collect
  20. Java.lang.IllegalArgumentException: Can not zip RDDs with unequal numbers of partitions
  21. // If two RDD partitions are different, an exception is thrown

ZipPartitions

The zipPartitions function combines multiple RDDs into a new RDD according to the partition, which requires the combined RDD to have the same number of partitions, but not for the number of elements within each partition.
The function has several kinds of realization, can be divided into three categories:
  • The parameter is an RDD
(Implicit arg0: ClassTag [B], arg1: ClassTag [V]) (fdd (Iterator [T], Iterator [B]) => Iterator [V]) (implicit arg0: ClassTag [B] ]): RDD [V]
Iterator [T], Iterator [B]) => Iterator [V]) (implicit arg0: ClassTag [B], arg1) : ClassTag [V]): RDD [V]
The difference between the two is the preservesPartitioning parameter, whether to retain the parent RDD partitioner partition information
The mapping method f parameter is an iterator of two RDDs.
  1. Scala> var rdd1 = sc.makeRDD (1 to 5,2)
  2. Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [22] at makeRDD at: 21
  3.  
  4. Scala> var rdd2 = sc.makeRDD (Seq ("A", "B", "C", "D", "E"), 2)
  5. Rdd2: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [23] at makeRDD at: 21
  6.  
  7. // rdd1 element distribution in two partitions:
  8. Scala> rdd1.mapPartitionsWithIndex {
  9. | (X, iter) => {
  10. | Var result = List [String] ()
  11. | While (iter.hasNext) {
  12. | Result :: = ("part_" + x + "|" + iter.next ())
  13. |}
  14. | Result.iterator
  15. |
  16. |}
  17. |} .collect
  18. Res17: Array [String] = Array (part_0 | 2, part_0 | 1, part_1 | 5, part_1 | 4, part_1 | 3)
  19.  
  20. // rdd2 element distribution in two partitions scala> rdd2.mapPartitionsWithIndex {
  21. | (X, iter) => {
  22. | Var result = List [String] ()
  23. | While (iter.hasNext) {
  24. | Result :: = ("part_" + x + "|" + iter.next ())
  25. |}
  26. | Result.iterator
  27. |
  28. |}
  29. |} .collect
  30. Res18: Array [String] = Array (part_0 | B, part_0 | A, part_1 | E, part_1 | D, part_1 |
  31.  
  32. // rdd1 and rdd2 do zipPartition
  33. Scala> rdd1.zipPartitions (rdd2) {
  34. | (Rdd1Iter, rdd2Iter) => {
  35. | Var result = List [String] ()
  36. | While (rdd1Iter.hasNext && rdd2Iter.hasNext) {
  37. | Result :: = (rdd1Iter.next () + "_" + rdd2Iter.next ())
  38. |}
  39. | Result.iterator
  40. |}
  41. |} .collect
  42. Res19: Array [String] = Array (2_B, 1_A, 5_E, 4_D, 3_C)
  43.  
  44.  
  • The parameters are two RDDs
(Iterator [T], Iterator [B], Iterator [C]) => Iterator [V]), and Iterator [B], Iterator [ (Implicit arg0: ClassTag [B], arg1: ClassTag [C], arg2: ClassTag [V]): RDD [V]
(Iterator [T], Iterator [B], Iterator [C]) => Iterator (B), Iterator [B], Iterator [B], Iterator [B], Iterator [ [V]) (implicit arg0: ClassTag [B], arg1: ClassTag [C], arg2: ClassTag [V]): RDD [V]
Usage with the above, but the function parameters for the two RDD, the mapping method f input parameters for the two RDD iterator.
  1. Scala> var rdd1 = sc.makeRDD (1 to 5,2)
  2. Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [27] at makeRDD at: 21
  3.  
  4. Scala> var rdd2 = sc.makeRDD (Seq ("A", "B", "C", "D", "E"), 2)
  5. Rdd2: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [28] at makeRDD at: 21
  6.  
  7. Scala> var rdd3 = sc.makeRDD (Seq ("a", "b", "c", "d", "e"), 2)
  8. Rdd3: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [29] at makeRDD at: 21
  9.  
  10. // rdd3 in the partition element distribution scala> rdd3.mapPartitionsWithIndex {
  11. | (X, iter) => {
  12. | Var result = List [String] ()
  13. | While (iter.hasNext) {
  14. | Result :: = ("part_" + x + "|" + iter.next ())
  15. |}
  16. | Result.iterator
  17. |
  18. |}
  19. |} .collect
  20. Res_1: Array [String] = Array (part_0 | b, part_0 | a, part_1 | e, part_1 | d, part_1 | c)
  21.  
  22. // three RDD do zipPartitions
  23. Scala> var rdd4 = rdd1.zipPartitions (rdd2, rdd3) {
  24. | (Rdd1Iter, rdd2Iter, rdd3Iter) => {
  25. | Var result = List [String] ()
  26. | While (rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {
  27. | Result :: = (rdd1Iter.next () + "_" + rdd2Iter.next () + "_" + rdd3Iter.next ())
  28. |}
  29. | Result.iterator
  30. |}
  31. |}
  32. Rdd4: org.apache.spark.rdd.RDD [String] = ZippedPartitionsRDD3 [33] at zipPartitions at: 27
  33.  
  34. Scala> rdd4.collect
  35. Res23: Array [String] = Array (2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)
  36.  
  • The parameters are three RDD
(Iterator [T], Iterator [B], Iterator [C ], rms [b], rut [r] Arg2: ClassTag [B], arg1: ClassTag [C], arg2: ClassTag [D], arg3: ClassTag [V]): RDD [V]) = [Iterator [D]) => Iterator [V]) (implicit arg0:
(Fter (Iterator [T], Iterator [B] ), ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) () ArgT: arg1: ClassTag [C], arg2: ClassTag [D], arg3: ClassTag [V]): RDD [V]
Usage with the above, but here the addition of a RDD only.

Commentaires

  1. It is nice blog Thank you provide important information and I am searching for the same information to save my time Big Data Hadoop Online Training

    RépondreSupprimer
  2. nice and great post on Spark Certification and its great place to learn

    RépondreSupprimer
  3. It's really a good blog kindly thank you for sharing this. Apache Spark Training Institute in Pune We provide Blog, LMS, Regular Course, Self Paced Course, Webinar Session: Marketing, Project: Sell, Technical Support, Lab Service: Sell.

    RépondreSupprimer
  4. Excellent blog,keep sharing more posts.

    This blog really helps me alot

    Thank you....

    big data online course
    big data and hadoop training

    RépondreSupprimer

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization