Spark Operator: RDD Action Action (7) -saveAsNewAPIHadoopFile, saveAsNewAPIHadoopDataset
SaveAsNewAPIHadoopFile
Def saveAsNewAPIHadoopFile [F <: OutputFormat [K, V]] (path: String) (implicit fm: ClassTag [F]): UnitBlock: String, keyClass: Class [_], valueClass: Class [_], outputFormatClass: Class [_ <: OutputFormat [_; _]], conf: Configuration = self.context.hadoopConfiguration): Unit
SaveAsNewAPIHadoopFile is used to save RDD data to HDFS using the new version of the Hadoop API.
Usage basically with saveAsHadoopFile.
- Import org.apache.spark.SparkConf
- Import org.apache.spark.SparkContext
- Import SparkContext._
- Import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
- Import org.apache.hadoop.io.Text
- Import org.apache.hadoop.io.IntWritable
- ("A", 1), ("B", 6), ("B", 3), ("B", 7)))
- Rdd1.saveAsNewAPIHadoopFile ("/ tmp / lxw1234 /", classOf [Text], classOf [IntWritable], classOf [TextOutputFormat [Text, IntWritable]])
SaveAsNewAPIHadoopDataset
Def saveAsNewAPIHadoopDataset (conf: Configuration): UnitRole with saveAsHadoopDataset, but with the new version of the Hadoop API.
To write HBase as an example:
HBase table:
Create a 'lxw1234', {NAME => 'f1', VERSIONS => 1}, {NAME => 'f2', VERSIONS => 1}, {NAME => 'f3', VERSIONS => 1}
Complete Spark application:
Note: save to HBase, run-time in the SPARK_CLASSPATH need to add HBase-related jar package.
- Package com.lxw1234.test
- Import org.apache.spark.SparkConf
- Import org.apache.spark.SparkContext
- Import SparkContext._
- Import org.apache.hadoop.hbase.HBaseConfiguration
- Import org.apache.hadoop.mapreduce.Job
- Import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
- Import org.apache.hadoop.hbase.io.ImmutableBytesWritable
- Import org.apache.hadoop.hbase.client.Result
- Import org.apache.hadoop.hbase.util.Bytes
- Import org.apache.hadoop.hbase.client.Put
- Object test {
- Def main (args: Array [String]) {
- Bo sparkConf = new SparkConf (). SetMaster ("spark: //lxw1234.com: 7077") .setAppName ("lxw1234.com")
- Val sc = new SparkContext (sparkConf);
- Var rdd1 = sc.makeRDD (Array (("A", 2), ("B", 6), ("C", 7)))
- Sc.hadoopConfiguration.set ("hbase.zookeeper.quorum", "zkNode1, zkNode2, zkNode3")
- Sc.hadoopConfiguration.set ("zookeeper.znode.parent", "/ hbase")
- Sc.hadoopConfiguration.set (TableOutputFormat.OUTPUT_TABLE, "lxw1234")
- Var job = new Job (sc.hadoopConfiguration)
- Job.setOutputKeyClass (classOf [ImmutableBytesWritable])
- Job.setOutputValueClass (classOf [Result])
- Job.setOutputFormatClass (classOf [TableOutputFormat [ImmutableBytesWritable]])
- Rdd1.map (
- X => {
- Var put = new Put (Bytes.toBytes (x._1))
- Put.add (Bytes.toBytes ("f1"), Bytes.toBytes ("c1"), Bytes.toBytes (x._2))
- (New ImmutableBytesWritable, put)
- }
- ) .saveAsNewAPIHadoopDataset (job.getConfiguration)
- Sc.stop ()
- }
- }
Commentaires
Enregistrer un commentaire