How to use Scala on Spark to load data into Hbase/MapRDB -- normal load or bulk load.
How to use Scala on Spark to load data into Hbase/MapRDB -- normal load or bulk load.
I will introduce 2 ways, one is normal load using Put , and another way is to use Bulk Load API.
1. Normal Load using org.apache.hadoop.hbase.client.Put(For Hbase and MapRDB)
This way is to use Put object to load data one by one. It is not so efficient as bulk loading.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| import org.apache.spark._import org.apache.spark.rdd.NewHadoopRDDimport org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.apache.hadoop.hbase.client.HBaseAdminimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HColumnDescriptorimport org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.HTable;val conf = HBaseConfiguration.create()val tableName = "/t1"conf.set(TableInputFormat.INPUT_TABLE, tableName)val myTable = new HTable(conf, tableName);var p = new Put();p = new Put(new String("row999").getBytes());p.add("cf".getBytes(), "column_name".getBytes(), new String("value999").getBytes());myTable.put(p);myTable.flushCommits(); |
2. Bulk Load using Hfiles(For Hbase only).
This way has 2 steps, 1st step is to generate Hfiles and then use org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load the Hfiles in Hbase.This only works for Hbase tables, not for MapRDB tables because is does not support bulk loading using Hfiles.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| import org.apache.spark._import org.apache.spark.rdd.NewHadoopRDDimport org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.apache.hadoop.hbase.client.HBaseAdminimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HColumnDescriptorimport org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.mapred.TableOutputFormatimport org.apache.hadoop.mapred.JobConfimport org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.mapreduce.Jobimport org.apache.hadoop.mapreduce.lib.input.FileInputFormatimport org.apache.hadoop.mapreduce.lib.output.FileOutputFormatimport org.apache.hadoop.hbase.KeyValueimport org.apache.hadoop.hbase.mapreduce.HFileOutputFormatimport org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFilesval conf = HBaseConfiguration.create()val tableName = "hao"val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)val job = Job.getInstance(conf)job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])job.setMapOutputValueClass (classOf[KeyValue])HFileOutputFormat.configureIncrementalLoad (job, table)// Generate 10 sample data:val num = sc.parallelize(1 to 10)val rdd = num.map(x=>{ val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() ) (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)})// Save Hfiles on HDFS rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)//Bulk load Hfiles to Hbaseval bulkLoader = new LoadIncrementalHFiles(conf)bulkLoader.doBulkLoad(new Path("/tmp/xxxx19"), table) |
1
2
3
4
5
6
7
8
9
10
11
12
| hbase(main):020:0> scan 'hao'ROW COLUMN+CELL \x00\x00\x00\x01 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x02 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x03 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x04 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x05 column=cf:c1, timestamp=1425128075586, value=value_xxx \x00\x00\x00\x06 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x07 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x08 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x09 column=cf:c1, timestamp=1425128075675, value=value_xxx \x00\x00\x00\x0A column=cf:c1, timestamp=1425128075675, value=value_xxx |
3. Direct Bulk Load without Hfiles(For Hbase and MapRDB).
This way does not need to create Hfiles on HDFS and it will save to Hbase tables directly.There is only a minor difference comparing to above examples:
Changes from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)
To:rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
Here is a complete example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| import org.apache.spark._import org.apache.spark.rdd.NewHadoopRDDimport org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.apache.hadoop.hbase.client.HBaseAdminimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HColumnDescriptorimport org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.mapred.TableOutputFormatimport org.apache.hadoop.mapred.JobConfimport org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.mapreduce.Jobimport org.apache.hadoop.mapreduce.lib.input.FileInputFormatimport org.apache.hadoop.mapreduce.lib.output.FileOutputFormatimport org.apache.hadoop.hbase.KeyValueimport org.apache.hadoop.hbase.mapreduce.HFileOutputFormatimport org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFilesval conf = HBaseConfiguration.create()val tableName = "hao"val table = new HTable(conf, tableName) conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)val job = Job.getInstance(conf)job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])job.setMapOutputValueClass (classOf[KeyValue])HFileOutputFormat.configureIncrementalLoad (job, table)// Generate 10 sample data:val num = sc.parallelize(1 to 10)val rdd = num.map(x=>{ val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() ) (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)})// Directly bulk load to Hbase/MapRDB tables.rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration()) |
Note: In above example, I am using "saveAsNewAPIHadoopFile" to save Hfiles on HDFS.
You can also use "saveAsNewAPIHadoopDataset" to achieve the same goal.
For example, just change below code from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
To:job.getConfiguration.set("mapred.output.dir", "/tmp/xxxx19")
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
Nice post,keep sharing more with us.
RépondreSupprimerKeep updating....
Big data and hadoop online training
Big data and hadoop training