Spark and HBase

Spark and HBase

Spark can work with multiple formats, including HBase tables. Unfortunately, I could not get the hbase python examples included with Spark to work. Hence, you may need to experiment with Scala and Spark instead. The quick start documentation with Scala code is fairly easy to read and understand even if one knows Python but not Scala (http://spark.apache.org/docs/1.2.1/quick-start.html). You can use the Scala example 'HBaseTest.scala' as the model for the exploration.
Start the hbase server. In the spark installation directory, you will need to start the spark shell including hbase jars in the driver classpath. E.g.
$ ../hbase-0.98.9-hadoop2/bin/start-hbase.sh
$ HBASE_PATH=`../hbase-0.98.9-hadoop2/bin/hbase classpath`
$ bin/spark-shell --driver-class-path $HBASE_PATH
scala>
You are now ready to enter scala code in the interactive shell.
scala> import org.apache.hadoop.hbase.HBaseConfiguration
scala> val conf = HBaseConfiguration.create()
scala> val tableName="spark_table"
scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
scala> conf.set(TableInputFormat.INPUT_TABLE,tableName)
scala> import org.apache.hadoop.hbase.client.HBaseAdmin
scala> val admin = new HbaseAdmin(conf)
scala> admin.isTableAvailable(tableName)
res2: Boolean = false
So far, you have set up a connection to hbase and confirmed that the table you wish to create does not exist.
scala> import org.apache.hadoop.hbase.{HTableDescriptor, HColumnDescriptor}
scala> val tableDesc = new HTableDescriptor(tableName)
scala> tableDesc.addFamily(new HColumnDescriptor("cf".getBytes()))
scala> admin.createTable(tableDesc)
Now, you have created a new table, “spark_table”, with a column family, “cf”. In another terminal window, in the hbase installation directory, you can run the hbase shell and verify that the table has indeed been created and it is empty.
$ bin/hbase shell
hbase(main):001:0> list
spark_table
=> ["spark_table"]
hbase(main):002:0> scan "spark_table"
ROW COLUMN+CELL
0 row(s) in 2.5640 seconds
You can go back to the scala shell prompt and enter some data.
scala> import org.apache.hadoop.hbase.client.{HTable,Put}
scala> val table = new HTable(conf, tableName)
scala> var row = new Put("dummy".getBytes())
scala> row.add("cf".getBytes(), "content".getBytes(), "Test data".getBytes())
scala> table.put(row)
scala> table.flushCommits()
You have created a connection to the hbase table and created a row with key, “dummy” and content, “Test data”, in a column, “cf:content”. You can verify that this data has indeed been stored from the hbase shell:
hbase(main):003:0> scan "spark_table"
ROW COLUMN+CELL
dummy column=cf:content, timestamp=1427880756119, value=Test data
1 row(s) in 0.1200 seconds
Suppose you have a group of text files in a hdfs directory, TextFiles, and you want to store them as key, value pairs in the hbase table where the file name is the key and the file content is the value.
The following code illustrates a way of doing so. The code appears to be complex in order to avoid serialization exceptions. (For an explanation, see http://stackoverflow.com/questions/25250774/writing-to-hbase-via-spark-task-not-serializable)
scala> val filesRDD = sc.wholeTextFiles("hdfs://localhost/fedora/TextFiles")
scala> filesRDD.foreachPartition { iter =>
| val hbaseConf = HBaseConfiguration.create()
| val table = new HTable(hbaseConf, tableName)
| iter.foreach { x =>
| var p = new Put(x._1.getBytes())
| p.add("cf".getBytes(),"content".getBytes(),x._2.getBytes())
| table.put(p)
| }}
The parameter, x, is a (key, value) tuple, whose elements can be accessed as x._1 and x._2, which is not a very familiar syntax. The key is the file name and the value, the content of the file.
It is far more likely that you will want to use an existing hbase table. All you need to do is to create an RDD of the table and after that you can do all the operations available on an RDD, e.g count() as shown below:
scala> val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
| classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
| classOf[org.apache.hadoop.hbase.client.Result])
scala> hBaseRDD.count()
Spark universe continues to expand as familiarity with it increases. It definitely seems like a technology with a bright future and worth exploring.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization