Spark and HBase
Spark and HBase
Spark can work with multiple formats, including HBase tables.
Unfortunately, I could not get the hbase python examples included
with Spark to work. Hence, you may need to experiment with Scala and
Spark instead. The quick start documentation with Scala code is
fairly easy to read and understand even if one knows Python but not
Scala (http://spark.apache.org/docs/1.2.1/quick-start.html).
You can use the Scala example 'HBaseTest.scala' as the model
for the exploration.
Start the hbase server. In the spark installation directory, you will need to start the spark shell including hbase jars in the driver classpath. E.g.
You are now ready to enter scala code in the interactive shell.
So far, you have set up a connection to hbase and confirmed that
the table you wish to create does not exist.
Now, you have created a new table, “spark_table”, with
a column family, “cf”. In another terminal window, in the
hbase installation directory, you can run the hbase shell and verify
that the table has indeed been created and it is empty.
You can go back to the scala shell prompt and enter some data.
You have created a connection to the hbase table and created a row
with key, “dummy” and content, “Test data”, in a column,
“cf:content”. You can verify that this data has indeed been
stored from the hbase shell:
Suppose you have a group of text files in a hdfs directory,
TextFiles, and you want to store them as key, value pairs in
the hbase table where the file name is the key and the file content
is the value.
The following code illustrates a way of doing so. The code appears to be complex in order to avoid serialization exceptions. (For an explanation, see http://stackoverflow.com/questions/25250774/writing-to-hbase-via-spark-task-not-serializable)
The parameter, x, is a (key, value) tuple, whose elements can be
accessed as x._1 and x._2, which is not a very familiar syntax. The
key is the file name and the value, the content of the file.
It is far more likely that you will want to use an existing hbase table. All you need to do is to create an RDD of the table and after that you can do all the operations available on an RDD, e.g count() as shown below:
Spark universe continues to expand as familiarity with it
increases. It definitely seems like a technology with a bright future
and worth exploring.
Start the hbase server. In the spark installation directory, you will need to start the spark shell including hbase jars in the driver classpath. E.g.
$
../hbase-0.98.9-hadoop2/bin/start-hbase.sh
$
HBASE_PATH=`../hbase-0.98.9-hadoop2/bin/hbase classpath`
$ bin/spark-shell
--driver-class-path $HBASE_PATH
scala>
scala> import
org.apache.hadoop.hbase.HBaseConfiguration
scala> val conf =
HBaseConfiguration.create()
scala> val
tableName="spark_table"
scala> import
org.apache.hadoop.hbase.mapreduce.TableInputFormat
scala>
conf.set(TableInputFormat.INPUT_TABLE,tableName)
scala> import
org.apache.hadoop.hbase.client.HBaseAdmin
scala> val admin = new
HbaseAdmin(conf)
scala>
admin.isTableAvailable(tableName)
res2: Boolean = false
scala> import
org.apache.hadoop.hbase.{HTableDescriptor, HColumnDescriptor}
scala> val tableDesc = new
HTableDescriptor(tableName)
scala> tableDesc.addFamily(new
HColumnDescriptor("cf".getBytes()))
scala>
admin.createTable(tableDesc)
$ bin/hbase shell
hbase(main):001:0> list
spark_table
=> ["spark_table"]
hbase(main):002:0> scan
"spark_table"
ROW
COLUMN+CELL
0 row(s) in 2.5640 seconds
scala> import
org.apache.hadoop.hbase.client.{HTable,Put}
scala> val table = new
HTable(conf, tableName)
scala> var row = new
Put("dummy".getBytes())
scala>
row.add("cf".getBytes(), "content".getBytes(),
"Test data".getBytes())
scala> table.put(row)
scala> table.flushCommits()
hbase(main):003:0> scan
"spark_table"
ROW
COLUMN+CELL
dummy
column=cf:content, timestamp=1427880756119, value=Test data
1 row(s) in 0.1200 seconds
The following code illustrates a way of doing so. The code appears to be complex in order to avoid serialization exceptions. (For an explanation, see http://stackoverflow.com/questions/25250774/writing-to-hbase-via-spark-task-not-serializable)
scala> val filesRDD =
sc.wholeTextFiles("hdfs://localhost/fedora/TextFiles")
scala>
filesRDD.foreachPartition { iter =>
| val hbaseConf =
HBaseConfiguration.create()
| val table = new
HTable(hbaseConf, tableName)
| iter.foreach { x =>
| var p = new
Put(x._1.getBytes())
|
p.add("cf".getBytes(),"content".getBytes(),x._2.getBytes())
| table.put(p)
| }}
It is far more likely that you will want to use an existing hbase table. All you need to do is to create an RDD of the table and after that you can do all the operations available on an RDD, e.g count() as shown below:
scala> val hBaseRDD =
sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
|
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
|
classOf[org.apache.hadoop.hbase.client.Result])
scala> hBaseRDD.count()
Commentaires
Enregistrer un commentaire