Spark Operator: RDD Action Action Action (6) -saveAsHadoopFile, saveAsHadoopDataset

SaveAsHadoopFile

ConcludeAsHadoopFile (path: String, keyClass: Class [_], valueClass: Class [_], outputFormatClass: Class [_ <: OutputFormat [_, _]], codec: Class [_ <: CompressionCodec]): Unit
Add: JobConf = ..., codec: Option [Class [_ < : CompressionCodec]] = None): Unit
SaveAsHadoopFile is the file that stores RDD on HDFS and supports the old version of the Hadoop API.
You can specify outputKeyClass, outputValueClass, and compression format.
Each partition outputs a file.
  1. ("A", 1), ("B", 6), ("B", 3), ("B", 7)))
  2.  
  3. Import org.apache.hadoop.mapred.TextOutputFormat
  4. Import org.apache.hadoop.io.Text
  5. Import org.apache.hadoop.io.IntWritable
  6.  
  7. Rdd1.saveAsHadoopFile ("/ tmp / lxw1234.com /", classOf [Text], classOf [IntWritable], classOf [TextOutputFormat [Text, IntWritable]])
  8.  
  9. Rdd1.saveAsHadoopFile ("/ tmp / lxw1234.com /", classOf [Text], classOf [IntWritable], classOf [TextOutputFormat [Text, IntWritable]],
  10. ClassOf [com.hadoop.compression.lzo.LzopCodec])
  11.  

SaveAsHadoopDataset

Def saveAsHadoopDataset (conf: JobConf): Unit
SaveAsHadoopDataset is used to save RDD to other storage except HDFS, such as HBase.
In JobConf, usually need to focus on or set five parameters:
The save path of the file, the class type of the key value, the class type of the value value, the output format of the RDD, and the compression-related parameters.
## Use saveAsHadoopDataset to save RDD to HDFS
  1. Import org.apache.spark.SparkConf
  2. Import org.apache.spark.SparkContext
  3. Import SparkContext._
  4. Import org.apache.hadoop.mapred.TextOutputFormat
  5. Import org.apache.hadoop.io.Text
  6. Import org.apache.hadoop.io.IntWritable
  7. Import org.apache.hadoop.mapred.JobConf
  8.  
  9.  
  10.  
  11. ("A", 1), ("B", 6), ("B", 3), ("B", 7)))
  12. Var jobConf = new JobConf ()
  13. JobConf.setOutputFormat (classOf [TextOutputFormat [Text, IntWritable]])
  14. JobConf.setOutputKeyClass (classOf [Text])
  15. JobConf.setOutputValueClass (classOf [IntWritable])
  16. JobConf.set ("mapred.output.dir", "/ tmp / lxw1234 /")
  17. Rdd1.saveAsHadoopDataset (jobConf)
  18.  
  19. result:
  20. Hadoop fs -cat / tmp / lxw1234 / part-00000
  21. A 2
  22. A 1
  23. Hadoop fs -cat / tmp / lxw1234 / part-00001
  24. B 6
  25. B 3
  26. B 7
  27.  
## Save the data to HBASE
HBase table:
Create a 'lxw1234', {NAME => 'f1', VERSIONS => 1}, {NAME => 'f2', VERSIONS => 1}, {NAME => 'f3', VERSIONS => 1}
  1. Import org.apache.spark.SparkConf
  2. Import org.apache.spark.SparkContext
  3. Import SparkContext._
  4. Import org.apache.hadoop.mapred.TextOutputFormat
  5. Import org.apache.hadoop.io.Text
  6. Import org.apache.hadoop.io.IntWritable
  7. Import org.apache.hadoop.mapred.JobConf
  8. Import org.apache.hadoop.hbase.HBaseConfiguration
  9. Import org.apache.hadoop.hbase.mapred.TableOutputFormat
  10. Import org.apache.hadoop.hbase.client.Put
  11. Import org.apache.hadoop.hbase.util.Bytes
  12. Import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  13.  
  14. Var conf = HBaseConfiguration.create ()
  15. Var jobConf = new JobConf (conf)
  16. JobConf.set ("hbase.zookeeper.quorum", "zkNode1, zkNode2, zkNode3")
  17. JobConf.set ("zookeeper.znode.parent", "/ hbase")
  18. JobConf.set (TableOutputFormat.OUTPUT_TABLE, "lxw1234")
  19. JobConf.setOutputFormat (classOf [TableOutputFormat])
  20. Var rdd1 = sc.makeRDD (Array (("A", 2), ("B", 6), ("C", 7)))
  21. Rdd1.map (x =>
  22. {
  23. Var put = new Put (Bytes.toBytes (x._1))
  24. Put.add (Bytes.toBytes ("f1"), Bytes.toBytes ("c1"), Bytes.toBytes (x._2))
  25. (New ImmutableBytesWritable, put)
  26. }
  27. ) .saveAsHadoopDataset (jobConf)
  28.  
  29. ##result:
  30. Hbase (main): 005: 0> scan 'lxw1234'
  31. ROW COLUMN + CELL
  32. A column = f1: c1, timestamp = 1436504941187, value = \ x00 \ x00 \ x00 \ x02
  33. B column = f1: c1, timestamp = 1436504941187, value = \ x00 \ x00 \ x00 \ x06
  34. C column = f1: c1, timestamp = 1436504941187, value = \ x00 \ x00 \ x00 \ x07
  35. 3 row (s) in 0.0550 seconds
  36.  
Note: save to HBase, run-time in the SPARK_CLASSPATH need to add HBase-related jar package.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization