Spark operator: RDD creation operation

Create RDD from the collection

Parallelize

  1. Scala> var rdd = sc.parallelize (1 to 10)
  2. Rdd: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [2] at parallelize at: 21
  3.  
  4. Scala> rdd.collect
  5. Res3: Array [Int] = Array (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  6.  
  7. Scala> rdd.partitions.size
  8. Res4: Int = 15
  9.  
  10. // set RDD to 3 partitions scala> var rdd2 = sc.parallelize (1 to 10,3)
  11. Rdd2: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [3] at parallelize at: 21
  12.  
  13. Scala> rdd2.collect
  14. Res5: Array [Int] = Array (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  15.  
  16. Scala> rdd2.partitions.size
  17. Res6: Int = 3
  18.  

  MakeRDD

  1. Scala> var collect = Seq ((1 to 10, Seq ("slave007.lxw1234.com", "slave002.lxw1234.com")),
  2. (11 to 15, Seq ("slave013.lxw1234.com", "slave015.lxw1234.com"))))
  3. Collect: Seq [(scala.collection.immutable.Range.Inclusive, Seq [String])] = List ((Range (1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
  4. List (slave (s) (slave00.lxw1234.com), (Range (11, 12, 13, 14, 15), List (slave013.lxw1234.com, slave015.lxw1234.com)))
  5.  
  6. Scala> var rdd = sc.makeRDD (collect)
  7. Rdd: org.apache.spark.rdd.RDD [scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD [6] at makeRDD at: 23
  8.  
  9. Scala> rdd.partitions.size
  10. Res33: Int = 2
  11.  
  12. Scala> rdd.preferredLocations (rdd.partitions (0))
  13. Res34: Seq [String] = List (slave007.lxw1234.com, slave002.lxw1234.com)
  14.  
  15. Scala> rdd.preferredLocations (rdd.partitions (1))
  16. Res35: Seq [String] = List (slave013.lxw1234.com, slave015.lxw1234.com)
  17.  
  18.  

Create RDD from external storage

TextFile

  1. // create scala from the hdfs file var rdd = sc.textFile ("hdfs: ///tmp/lxw1234/1.txt")
  2. Rdd: org.apache.spark.rdd.RDD [String] = MapPartitionsRDD [26] at textFile at: 21
  3.  
  4. Scala> rdd.count
  5. Res48: Long = 4
  6.  
  7. // create scala> var rdd = sc.textFile ("file: ///etc/hadoop/conf/core-site.xml" from local file )
  8. Rdd: org.apache.spark.rdd.RDD [String] = MapPartitionsRDD [28] at textFile at: 21
  9.  
  10. Scala> rdd.count
  11. Res49: Long = 97
  12.  

 Created from other HDFS file formats

HadoopFile
SequenceFile
ObjectFile
NewAPIHadoopFile
  • Created from the Hadoop interface API
HadoopRDD
NewAPIHadoopRDD
For example: Create an RDD from HBase 

  1. Scala> import org.apache.hadoop.hbase. {HBaseConfiguration, HTableDescriptor, TableName}
  2. Import org.apache.hadoop.hbase. {HBaseConfiguration, HTableDescriptor, TableName}
  3.  
  4. Scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  5. Import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  6.  
  7. Scala> import org.apache.hadoop.hbase.client.HBaseAdmin
  8. Import org.apache.hadoop.hbase.client.HBaseAdmin
  9.  
  10. Scala> val conf = HBaseConfiguration.create ()
  11. Scala> conf.set (TableInputFormat.INPUT_TABLE, "lxw1234")
  12. Scala> var hbaseRDD = sc.newAPIHadoopRDD (
  13. Conf, classOf [org.apache.hadoop.hbase.mapreduce.TableInputFormat], classOf [org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf [org.apache.hadoop.hbase.client.Result])
  14.  
  15. Scala> hbaseRDD.count
  16. Res52: Long = 1
  17.  

 

Commentaires

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization