Spark Operator: RDD Action Action Action (6) -saveAsHadoopFile, saveAsHadoopDataset
SaveAsHadoopFile
ConcludeAsHadoopFile (path: String, keyClass: Class [_], valueClass: Class [_], outputFormatClass: Class [_ <: OutputFormat [_, _]], codec: Class [_ <: CompressionCodec]): UnitAdd: JobConf = ..., codec: Option [Class [_ < : CompressionCodec]] = None): Unit
SaveAsHadoopFile is the file that stores RDD on HDFS and supports the old version of the Hadoop API.
You can specify outputKeyClass, outputValueClass, and compression format.
Each partition outputs a file.
- ("A", 1), ("B", 6), ("B", 3), ("B", 7)))
- Import org.apache.hadoop.mapred.TextOutputFormat
- Import org.apache.hadoop.io.Text
- Import org.apache.hadoop.io.IntWritable
- Rdd1.saveAsHadoopFile ("/ tmp / lxw1234.com /", classOf [Text], classOf [IntWritable], classOf [TextOutputFormat [Text, IntWritable]])
- Rdd1.saveAsHadoopFile ("/ tmp / lxw1234.com /", classOf [Text], classOf [IntWritable], classOf [TextOutputFormat [Text, IntWritable]],
- ClassOf [com.hadoop.compression.lzo.LzopCodec])
SaveAsHadoopDataset
Def saveAsHadoopDataset (conf: JobConf): UnitSaveAsHadoopDataset is used to save RDD to other storage except HDFS, such as HBase.
In JobConf, usually need to focus on or set five parameters:
The save path of the file, the class type of the key value, the class type of the value value, the output format of the RDD, and the compression-related parameters.
## Use saveAsHadoopDataset to save RDD to HDFS
## Save the data to HBASE
- Import org.apache.spark.SparkConf
- Import org.apache.spark.SparkContext
- Import SparkContext._
- Import org.apache.hadoop.mapred.TextOutputFormat
- Import org.apache.hadoop.io.Text
- Import org.apache.hadoop.io.IntWritable
- Import org.apache.hadoop.mapred.JobConf
- ("A", 1), ("B", 6), ("B", 3), ("B", 7)))
- Var jobConf = new JobConf ()
- JobConf.setOutputFormat (classOf [TextOutputFormat [Text, IntWritable]])
- JobConf.setOutputKeyClass (classOf [Text])
- JobConf.setOutputValueClass (classOf [IntWritable])
- JobConf.set ("mapred.output.dir", "/ tmp / lxw1234 /")
- Rdd1.saveAsHadoopDataset (jobConf)
- result:
- Hadoop fs -cat / tmp / lxw1234 / part-00000
- A 2
- A 1
- Hadoop fs -cat / tmp / lxw1234 / part-00001
- B 6
- B 3
- B 7
HBase table:
Create a 'lxw1234', {NAME => 'f1', VERSIONS => 1}, {NAME => 'f2', VERSIONS => 1}, {NAME => 'f3', VERSIONS => 1}
Note: save to HBase, run-time in the SPARK_CLASSPATH need to add HBase-related jar package.
- Import org.apache.spark.SparkConf
- Import org.apache.spark.SparkContext
- Import SparkContext._
- Import org.apache.hadoop.mapred.TextOutputFormat
- Import org.apache.hadoop.io.Text
- Import org.apache.hadoop.io.IntWritable
- Import org.apache.hadoop.mapred.JobConf
- Import org.apache.hadoop.hbase.HBaseConfiguration
- Import org.apache.hadoop.hbase.mapred.TableOutputFormat
- Import org.apache.hadoop.hbase.client.Put
- Import org.apache.hadoop.hbase.util.Bytes
- Import org.apache.hadoop.hbase.io.ImmutableBytesWritable
- Var conf = HBaseConfiguration.create ()
- Var jobConf = new JobConf (conf)
- JobConf.set ("hbase.zookeeper.quorum", "zkNode1, zkNode2, zkNode3")
- JobConf.set ("zookeeper.znode.parent", "/ hbase")
- JobConf.set (TableOutputFormat.OUTPUT_TABLE, "lxw1234")
- JobConf.setOutputFormat (classOf [TableOutputFormat])
- Var rdd1 = sc.makeRDD (Array (("A", 2), ("B", 6), ("C", 7)))
- Rdd1.map (x =>
- {
- Var put = new Put (Bytes.toBytes (x._1))
- Put.add (Bytes.toBytes ("f1"), Bytes.toBytes ("c1"), Bytes.toBytes (x._2))
- (New ImmutableBytesWritable, put)
- }
- ) .saveAsHadoopDataset (jobConf)
- ##result:
- Hbase (main): 005: 0> scan 'lxw1234'
- ROW COLUMN + CELL
- A column = f1: c1, timestamp = 1436504941187, value = \ x00 \ x00 \ x00 \ x02
- B column = f1: c1, timestamp = 1436504941187, value = \ x00 \ x00 \ x00 \ x06
- C column = f1: c1, timestamp = 1436504941187, value = \ x00 \ x00 \ x00 \ x07
- 3 row (s) in 0.0550 seconds
Commentaires
Enregistrer un commentaire