HBase after seven years of development, and finally at the end of February this year, released 1.0.0 version. This version provides some exciting features, and, without sacrificing stability, the introduction of a new API. Although 1.0.0 compatible with the old version of the API, but still should be familiar with the new API as soon as possible. And understand how to combine with the current red Spark, the data written and read. In view of the domestic and foreign information on the new API HBase 1.0.0 little, so this article.
This article will be introduced in two parts, the first part of the new
version of the API to explain the use of CRUD basic operation; the
second part of the Spark to explain how to write RDDs HBase table, on
the contrary, HBase table is how to RDDs loaded Spark inside.
Environment configuration
In order to avoid unnecessary inconsistencies in the version of the trouble, API and HBase environment are 1.0.0 version. HBase is a stand-alone mode, HBaseConfiguration
mode is similar to the use of only need to modify the HBaseConfiguration
can be.
Use the SBT to load dependencies in the development environment
Name: = "SparkLearn"
Version: = "1.0"
ScalaVersion: = "2.10.4"
LibraryDependencies + = "org.apache.spark" %% "spark-core"% "1.3.0"
LibraryDependencies + = "org.apache.hbase"% "hbase-client"% "1.0.0"
LibraryDependencies + = "org.apache.hbase"% "hbase-common"% "1.0.0"
LibraryDependencies + = "org.apache.hbase"% "hbase-server"% "1.0.0"
|
HBase CRUD operation
The new version of the API to join the Connection
, HAdmin
became Admin
, HTable
into the Table
, and Admin
and Table
can only be obtained through Connection
. Connection
is a Connection
, because Connection
is thread-safe, so Connection
use of a single case, the factory method requires a HBaseConfiguration
.
Val conf = HBaseConfiguration.create ()
Conf.set ("hbase.zookeeper.property.clientPort", "2181")
Conf.set ("hbase.zookeeper.quorum", "master")
// Connection creation is a heavyweight work, thread safe, is the operation of the importation of hbase val conn = ConnectionFactory.createConnection (conf)
|
Create a table
Use Admin
create and delete tables
Val userTable = TableName.valueOf ("user")
// create user table val tableDescr = new HTableDescriptor (userTable)
TableDescr.addFamily (new HColumnDescriptor ("basic" .getBytes))
Println ("Creating table` user`. ")
If (admin.tableExists (userTable)) {
Admin.disableTable (userTable)
Admin.deleteTable (userTable)
}
Admin.createTable (tableDescr)
Println ("Done!")
|
Insert, query, scan, delete operation
HBase operations need to create an Put
object Put
, Get
, Delete
, etc., and then call the Table
method on the Table
Try {
// get the user table val table = conn.getTable (userTable)
Try {
/ / Ready to insert a key for the id001 data val p = new Put ("id001" .getBytes)
// specify the column and value for the put operation (the previous put.add method is deprecated)
P.addColumn ("basic" .getBytes, "name" .getBytes, "wuchong" .getBytes)
// submit table.put (p)
// query a piece of data val g = new Get ("id001" .getBytes)
Val result = table.get (g)
Val value = Bytes.toString (result.getValue ("basic" .getBytes, "name" .getBytes))
Println ("GET id001:" + value)
// scan data val s = new Scan ()
S.addColumn ("basic" .getBytes, "name" .getBytes)
Val scanner = table.getScanner (s)
Try {
For (r <- scanner) {
Println ("Found row:" + r)
Println ("Found value:" + Bytes.toString (
R.getValue ("basic" .getBytes, "name" .getBytes)))
}
} Finally {
// fix scanner close scanner.close ()
}
// delete a piece of data in a manner similar to that of val d = new Delete ("id001" .getBytes)
D.addColumn ("basic" .getBytes, "name" .getBytes)
Table.delete (d)
} Finally {
If (table! = Null) table.close ()
}
} Finally {
Conn.close ()
}
|
Spark operation HBase
Write HBase
First write data to HBase, we need to use PairRDDFunctions.saveAsHadoopDataset
. Because HBase is not a file system, the saveAsHadoopFile
method is useless.
def saveAsHadoopDataset(conf: JobConf): Unit
Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for that storage system
This method requires a JobConf as a parameter, similar to a
configuration item, the main need to specify the output format and
output table name.
Step 1: We need to create a JobConf first.
// Define HBase configuration val conf = HBaseConfiguration.create ()
Conf.set ("hbase.zookeeper.property.clientPort", "2181")
Conf.set ("hbase.zookeeper.quorum", "master")
// specify the output format and output the table name val jobConf = new JobConf (conf, this.getClass)
JobConf.setOutputFormat (classOf [TableOutputFormat])
JobConf.set (TableOutputFormat.OUTPUT_TABLE, "user")
|
Step 2: RDD to table schema mapping in HBase table schema is generally the case:
row cf :col_1 cf :col_2
In Spark, we operate RDD tuples, such as (1,"lilei",14)
, (2,"hanmei",18)
. We need to convert RDD[(uid:Int, name:String, age:Int)]
to RDD[(ImmutableBytesWritable, Put)]
. So, we define a convert function to do this conversion job
Def convert (triple: (Int, String, Int)) = {
Val p = new Put (Bytes.toBytes (triple._1))
P.addColumn (Bytes.toBytes ("basic"), Bytes.toBytes ("name"), Bytes.toBytes (triple._2))
P.addColumn (Bytes.toBytes ("basic"), Bytes.toBytes ("age"), Bytes.toBytes (triple._3))
(New ImmutableBytesWritable, p)
}
|
Step 3: Read RDD and convert
// read RDD data from somewhere and convert
Val rawData = List (1, "lilei", 14), (2, "hanmei", 18), (3, "someone", 38)
Val localData = sc.parallelize (rawData) .map (convert)
|
Step 4: Write HBase using the saveAsHadoopDataset
method
LocalData.saveAsHadoopDataset (jobConf)
|
Read HBase
Spark reads HBase, we use the newAPIHadoopRDD
API provided by newAPIHadoopRDD
to load the contents of the table into Spark as RDDs.
Val conf = HBaseConfiguration .create ()
Conf.set ( "hbase.zookeeper.property.clientPort" , "2181" )
Conf.set ( "hbase.zookeeper.quorum" , "master" )
Conf.set ( TableInputFormat . INPUT_TABLE , "user" )
Val usersRDD = sc.newAPIHadoopRDD (conf, classOf [ TableInputFormat ],
ClassOf [org.apache.hadoop.hbase.io. ImmutableBytesWritable ],
ClassOf [org.apache.hadoop.hbase.client. Result ])
Val count = usersRDD.count ()
Println ( "Users RDD Count:" + count)
UsersRDD.cache ()
UsersRDD.foreach { case (_, result) =>
Val key = Bytes .toInt (result.getRow)
Val name = Bytes .toString (result.getValue ( "basic" .getBytes, "name" .getBytes))
Val age = Bytes .toInt (result.getValue ( "basic" .getBytes, "age" .getBytes))
Println ( "Row key:" + key + "Name:" + name + "Age:" + age)
}
|
appendix
More complete code has been uploaded to Gist.
From: http: //www.wuchong.me/blog/2015/04/06/spark-on-hbase-new-api/
Commentaires
Enregistrer un commentaire