How to use Scala on Spark to load data into Hbase/MapRDB -- normal load or bulk load.

How to use Scala on Spark to load data into Hbase/MapRDB -- normal load or bulk load.

This article shows a sample code to load data into Hbase or MapRDB(M7) using Scala on Spark.
I will introduce 2 ways, one is normal load using Put , and another way is to use Bulk Load API.

1. Normal Load using org.apache.hadoop.hbase.client.Put(For Hbase and MapRDB)

This way is to use Put object to load data one by one. It is not so efficient as bulk loading.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
 
val conf = HBaseConfiguration.create()
val tableName = "/t1"
conf.set(TableInputFormat.INPUT_TABLE, tableName)
 
val myTable = new HTable(conf, tableName);
var p = new Put();
p = new Put(new String("row999").getBytes());
p.add("cf".getBytes(), "column_name".getBytes(), new String("value999").getBytes());
myTable.put(p);
myTable.flushCommits();

2. Bulk Load using Hfiles(For Hbase only).

This way has 2 steps, 1st step is to generate Hfiles and then use org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load the Hfiles in Hbase.
This only works for Hbase tables, not for MapRDB tables because is does not support bulk loading using Hfiles.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 
val conf = HBaseConfiguration.create()
val tableName = "hao"
val table = new HTable(conf, tableName)
 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
 
// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
 
// Save Hfiles on HDFS
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)
 
//Bulk load Hfiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/xxxx19"), table)
After that, 10 rows are inserted.
1
2
3
4
5
6
7
8
9
10
11
12
hbase(main):020:0> scan 'hao'
ROW                                                 COLUMN+CELL
 \x00\x00\x00\x01                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
 \x00\x00\x00\x02                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
 \x00\x00\x00\x03                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
 \x00\x00\x00\x04                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
 \x00\x00\x00\x05                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
 \x00\x00\x00\x06                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
 \x00\x00\x00\x07                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
 \x00\x00\x00\x08                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
 \x00\x00\x00\x09                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
 \x00\x00\x00\x0A                                   column=cf:c1, timestamp=1425128075675, value=value_xxx

3. Direct Bulk Load without Hfiles(For Hbase and MapRDB).

This way does not need to create Hfiles on HDFS and it will save to Hbase tables directly.
There is only a minor difference comparing to above examples:
Changes from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)  
To:
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
Here is a complete example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 
val conf = HBaseConfiguration.create()
val tableName = "hao"
val table = new HTable(conf, tableName)
 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
 
// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
 
// Directly bulk load to Hbase/MapRDB tables.
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())

Note: In above example, I am using "saveAsNewAPIHadoopFile" to save Hfiles on HDFS.
You can also use "saveAsNewAPIHadoopDataset" to achieve the same goal.
For example, just change below code from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration()) 
To:
job.getConfiguration.set("mapred.output.dir", "/tmp/xxxx19")
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)

Commentaires

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization