How to use Scala on Spark to load data into Hbase/MapRDB -- normal load or bulk load.
How to use Scala on Spark to load data into Hbase/MapRDB -- normal load or bulk load.
I will introduce 2 ways, one is normal load using Put , and another way is to use Bulk Load API.
1. Normal Load using org.apache.hadoop.hbase.client.Put(For Hbase and MapRDB)
This way is to use Put object to load data one by one. It is not so efficient as bulk loading.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import org.apache.spark. _ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; val conf = HBaseConfiguration.create() val tableName = "/t1" conf.set(TableInputFormat.INPUT _ TABLE, tableName) val myTable = new HTable(conf, tableName); var p = new Put(); p = new Put( new String( "row999" ).getBytes()); p.add( "cf" .getBytes(), "column_name" .getBytes(), new String( "value999" ).getBytes()); myTable.put(p); myTable.flushCommits(); |
2. Bulk Load using Hfiles(For Hbase only).
This way has 2 steps, 1st step is to generate Hfiles and then use org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load the Hfiles in Hbase.This only works for Hbase tables, not for MapRDB tables because is does not support bulk loading using Hfiles.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| import org.apache.spark. _ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles val conf = HBaseConfiguration.create() val tableName = "hao" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT _ TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) // Generate 10 sample data: val num = sc.parallelize( 1 to 10 ) val rdd = num.map(x = >{ val kv : KeyValue = new KeyValue(Bytes.toBytes(x), "cf" .getBytes(), "c1" .getBytes(), "value_xxx" .getBytes() ) ( new ImmutableBytesWritable(Bytes.toBytes(x)), kv) }) // Save Hfiles on HDFS rdd.saveAsNewAPIHadoopFile( "/tmp/xxxx19" , classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf) //Bulk load Hfiles to Hbase val bulkLoader = new LoadIncrementalHFiles(conf) bulkLoader.doBulkLoad( new Path( "/tmp/xxxx19" ), table) |
1
2
3
4
5
6
7
8
9
10
11
12
| hbase(main):020:0> scan 'hao' ROW COLUMN+CELL \x00\x00\x00\x01 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x02 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x03 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x04 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x05 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x06 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x07 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x08 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x09 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x0A column=cf:c1, timestamp=1425128075675, value=value_xxx |
3. Direct Bulk Load without Hfiles(For Hbase and MapRDB).
This way does not need to create Hfiles on HDFS and it will save to Hbase tables directly.There is only a minor difference comparing to above examples:
Changes from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)
To:rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
Here is a complete example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| import org.apache.spark. _ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.hbase.KeyValue import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles val conf = HBaseConfiguration.create() val tableName = "hao" val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT _ TABLE, tableName) val job = Job.getInstance(conf) job.setMapOutputKeyClass (classOf[ImmutableBytesWritable]) job.setMapOutputValueClass (classOf[KeyValue]) HFileOutputFormat.configureIncrementalLoad (job, table) // Generate 10 sample data: val num = sc.parallelize( 1 to 10 ) val rdd = num.map(x = >{ val kv : KeyValue = new KeyValue(Bytes.toBytes(x), "cf" .getBytes(), "c1" .getBytes(), "value_xxx" .getBytes() ) ( new ImmutableBytesWritable(Bytes.toBytes(x)), kv) }) // Directly bulk load to Hbase/MapRDB tables. rdd.saveAsNewAPIHadoopFile( "/tmp/xxxx19" , classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration()) |
Note: In above example, I am using "saveAsNewAPIHadoopFile" to save Hfiles on HDFS.
You can also use "saveAsNewAPIHadoopDataset" to achieve the same goal.
For example, just change below code from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
To:job.getConfiguration.set("mapred.output.dir", "/tmp/xxxx19")
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
Nice post,keep sharing more with us.
RépondreSupprimerKeep updating....
Big data and hadoop online training
Big data and hadoop training