Spark operator: RDD basic conversion operation (6) -zip, zipPartitions
Zip
Def zip [U] (other: RDD [U]) (implicit arg0: ClassTag [U]): RDD [(T, U)]The zip function is used to combine two RDDs into the RDD of the Key / Value form. The default number of RDD partitions and the number of elements are the same, otherwise an exception is thrown.
- Scala> var rdd1 = sc.makeRDD (1 to 10,2)
- Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [0] at makeRDD at: 21
- Scala> var rdd1 = sc.makeRDD (1 to 5,2)
- Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [1] at makeRDD at: 21
- Scala> var rdd2 = sc.makeRDD (Seq ("A", "B", "C", "D", "E"), 2)
- Rdd2: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [2] at makeRDD at: 21
- Scala> rdd1.zip (rdd2) .collect
- Res0: Array [(Int, String)] = Array ((1, A), (2, B), (3, C), (4, D), (5, E))
- Scala> rdd2.zip (rdd1) .collect
- Resl: Array [(String, Int)] = Array ((A, 1), (B, 2), (C, 3), (D, 4), (E, 5))
- Scala> var rdd3 = sc.makeRDD (Seq ("A", "B", "C", "D", "E"), 3)
- Rdd3: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [5] at makeRDD at: 21
- Scala> rdd1.zip (rdd3) .collect
- Java.lang.IllegalArgumentException: Can not zip RDDs with unequal numbers of partitions
- // If two RDD partitions are different, an exception is thrown
ZipPartitions
The zipPartitions function combines multiple RDDs into a new RDD according to the partition, which requires the combined RDD to have the same number of partitions, but not for the number of elements within each partition.The function has several kinds of realization, can be divided into three categories:
- The parameter is an RDD
(Implicit
arg0: ClassTag [B], arg1: ClassTag [V]) (fdd (Iterator [T], Iterator
[B]) => Iterator [V]) (implicit arg0: ClassTag [B] ]): RDD [V]
Iterator [T], Iterator [B]) => Iterator [V]) (implicit arg0: ClassTag [B], arg1) : ClassTag [V]): RDD [V]
The difference between the two is the preservesPartitioning parameter,
whether to retain the parent RDD partitioner partition information
The mapping method f parameter is an iterator of two RDDs.
- Scala> var rdd1 = sc.makeRDD (1 to 5,2)
- Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [22] at makeRDD at: 21
- Scala> var rdd2 = sc.makeRDD (Seq ("A", "B", "C", "D", "E"), 2)
- Rdd2: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [23] at makeRDD at: 21
- // rdd1 element distribution in two partitions:
- Scala> rdd1.mapPartitionsWithIndex {
- | (X, iter) => {
- | Var result = List [String] ()
- | While (iter.hasNext) {
- | Result :: = ("part_" + x + "|" + iter.next ())
- |}
- | Result.iterator
- |
- |}
- |} .collect
- Res17: Array [String] = Array (part_0 | 2, part_0 | 1, part_1 | 5, part_1 | 4, part_1 | 3)
- // rdd2 element distribution in two partitions scala> rdd2.mapPartitionsWithIndex {
- | (X, iter) => {
- | Var result = List [String] ()
- | While (iter.hasNext) {
- | Result :: = ("part_" + x + "|" + iter.next ())
- |}
- | Result.iterator
- |
- |}
- |} .collect
- Res18: Array [String] = Array (part_0 | B, part_0 | A, part_1 | E, part_1 | D, part_1 |
- // rdd1 and rdd2 do zipPartition
- Scala> rdd1.zipPartitions (rdd2) {
- | (Rdd1Iter, rdd2Iter) => {
- | Var result = List [String] ()
- | While (rdd1Iter.hasNext && rdd2Iter.hasNext) {
- | Result :: = (rdd1Iter.next () + "_" + rdd2Iter.next ())
- |}
- | Result.iterator
- |}
- |} .collect
- Res19: Array [String] = Array (2_B, 1_A, 5_E, 4_D, 3_C)
- The parameters are two RDDs
(Iterator [T], Iterator [B], Iterator [C]) => Iterator [V]), and
Iterator [B], Iterator [ (Implicit arg0: ClassTag [B], arg1: ClassTag
[C], arg2: ClassTag [V]): RDD [V]
(Iterator [T], Iterator [B], Iterator [C]) => Iterator (B), Iterator
[B], Iterator [B], Iterator [B], Iterator [ [V]) (implicit arg0:
ClassTag [B], arg1: ClassTag [C], arg2: ClassTag [V]): RDD [V]
Usage with the above, but the function parameters for the two RDD, the
mapping method f input parameters for the two RDD iterator.
- Scala> var rdd1 = sc.makeRDD (1 to 5,2)
- Rdd1: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [27] at makeRDD at: 21
- Scala> var rdd2 = sc.makeRDD (Seq ("A", "B", "C", "D", "E"), 2)
- Rdd2: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [28] at makeRDD at: 21
- Scala> var rdd3 = sc.makeRDD (Seq ("a", "b", "c", "d", "e"), 2)
- Rdd3: org.apache.spark.rdd.RDD [String] = ParallelCollectionRDD [29] at makeRDD at: 21
- // rdd3 in the partition element distribution scala> rdd3.mapPartitionsWithIndex {
- | (X, iter) => {
- | Var result = List [String] ()
- | While (iter.hasNext) {
- | Result :: = ("part_" + x + "|" + iter.next ())
- |}
- | Result.iterator
- |
- |}
- |} .collect
- Res_1: Array [String] = Array (part_0 | b, part_0 | a, part_1 | e, part_1 | d, part_1 | c)
- // three RDD do zipPartitions
- Scala> var rdd4 = rdd1.zipPartitions (rdd2, rdd3) {
- | (Rdd1Iter, rdd2Iter, rdd3Iter) => {
- | Var result = List [String] ()
- | While (rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {
- | Result :: = (rdd1Iter.next () + "_" + rdd2Iter.next () + "_" + rdd3Iter.next ())
- |}
- | Result.iterator
- |}
- |}
- Rdd4: org.apache.spark.rdd.RDD [String] = ZippedPartitionsRDD3 [33] at zipPartitions at: 27
- Scala> rdd4.collect
- Res23: Array [String] = Array (2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)
- The parameters are three RDD
(Iterator [T], Iterator [B], Iterator [C ], rms [b], rut [r]
Arg2: ClassTag [B], arg1: ClassTag [C], arg2: ClassTag [D], arg3:
ClassTag [V]): RDD [V]) = [Iterator [D]) => Iterator [V]) (implicit
arg0:
(Fter (Iterator [T], Iterator [B] ), ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) ( ) () ArgT: arg1: ClassTag [C], arg2: ClassTag [D], arg3: ClassTag [V]): RDD [V]
Usage with the above, but here the addition of a RDD only.
It is nice blog Thank you provide important information and I am searching for the same information to save my time Big Data Hadoop Online Training
RépondreSupprimernice and great post on Spark Certification and its great place to learn
RépondreSupprimerIt's really a good blog kindly thank you for sharing this. Apache Spark Training Institute in Pune We provide Blog, LMS, Regular Course, Self Paced Course, Webinar Session: Marketing, Project: Sell, Technical Support, Lab Service: Sell.
RépondreSupprimerExcellent blog,keep sharing more posts.
RépondreSupprimerThis blog really helps me alot
Thank you....
big data online course
big data and hadoop training