Spark Operator: RDD Action Action (7) -saveAsNewAPIHadoopFile, saveAsNewAPIHadoopDataset

SaveAsNewAPIHadoopFile

Def saveAsNewAPIHadoopFile [F <: OutputFormat [K, V]] (path: String) (implicit fm: ClassTag [F]): Unit
Block: String, keyClass: Class [_], valueClass: Class [_], outputFormatClass: Class [_ <: OutputFormat [_; _]], conf: Configuration = self.context.hadoopConfiguration): Unit
SaveAsNewAPIHadoopFile is used to save RDD data to HDFS using the new version of the Hadoop API.
Usage basically with saveAsHadoopFile.
  1. Import org.apache.spark.SparkConf
  2. Import org.apache.spark.SparkContext
  3. Import SparkContext._
  4. Import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
  5. Import org.apache.hadoop.io.Text
  6. Import org.apache.hadoop.io.IntWritable
  7.  
  8. ("A", 1), ("B", 6), ("B", 3), ("B", 7)))
  9. Rdd1.saveAsNewAPIHadoopFile ("/ tmp / lxw1234 /", classOf [Text], classOf [IntWritable], classOf [TextOutputFormat [Text, IntWritable]])
  10.  

SaveAsNewAPIHadoopDataset

Def saveAsNewAPIHadoopDataset (conf: Configuration): Unit
Role with saveAsHadoopDataset, but with the new version of the Hadoop API.
To write HBase as an example:
HBase table:
Create a 'lxw1234', {NAME => 'f1', VERSIONS => 1}, {NAME => 'f2', VERSIONS => 1}, {NAME => 'f3', VERSIONS => 1}
Complete Spark application:
  1. Package com.lxw1234.test
  2.  
  3. Import org.apache.spark.SparkConf
  4. Import org.apache.spark.SparkContext
  5. Import SparkContext._
  6. Import org.apache.hadoop.hbase.HBaseConfiguration
  7. Import org.apache.hadoop.mapreduce.Job
  8. Import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
  9. Import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  10. Import org.apache.hadoop.hbase.client.Result
  11. Import org.apache.hadoop.hbase.util.Bytes
  12. Import org.apache.hadoop.hbase.client.Put
  13.  
  14. Object test {
  15. Def main (args: Array [String]) {
  16. Bo sparkConf = new SparkConf (). SetMaster ("spark: //lxw1234.com: 7077") .setAppName ("lxw1234.com")
  17. Val sc = new SparkContext (sparkConf);
  18. Var rdd1 = sc.makeRDD (Array (("A", 2), ("B", 6), ("C", 7)))
  19. Sc.hadoopConfiguration.set ("hbase.zookeeper.quorum", "zkNode1, zkNode2, zkNode3")
  20. Sc.hadoopConfiguration.set ("zookeeper.znode.parent", "/ hbase")
  21. Sc.hadoopConfiguration.set (TableOutputFormat.OUTPUT_TABLE, "lxw1234")
  22. Var job = new Job (sc.hadoopConfiguration)
  23. Job.setOutputKeyClass (classOf [ImmutableBytesWritable])
  24. Job.setOutputValueClass (classOf [Result])
  25. Job.setOutputFormatClass (classOf [TableOutputFormat [ImmutableBytesWritable]])
  26. Rdd1.map (
  27. X => {
  28. Var put = new Put (Bytes.toBytes (x._1))
  29. Put.add (Bytes.toBytes ("f1"), Bytes.toBytes ("c1"), Bytes.toBytes (x._2))
  30. (New ImmutableBytesWritable, put)
  31. }
  32. ) .saveAsNewAPIHadoopDataset (job.getConfiguration)
  33. Sc.stop ()
  34. }
  35. }
  36.  
Note: save to HBase, run-time in the SPARK_CLASSPATH need to add HBase-related jar package.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization