Spark operation HBase (1.0.0 new API)

HBase after seven years of development, and finally at the end of February this year, released 1.0.0 version. This version provides some exciting features, and, without sacrificing stability, the introduction of a new API. Although 1.0.0 compatible with the old version of the API, but still should be familiar with the new API as soon as possible. And understand how to combine with the current red Spark, the data written and read. In view of the domestic and foreign information on the new API HBase 1.0.0 little, so this article.
This article will be introduced in two parts, the first part of the new version of the API to explain the use of CRUD basic operation; the second part of the Spark to explain how to write RDDs HBase table, on the contrary, HBase table is how to RDDs loaded Spark inside.

Environment configuration

In order to avoid unnecessary inconsistencies in the version of the trouble, API and HBase environment are 1.0.0 version. HBase is a stand-alone mode, HBaseConfiguration mode is similar to the use of only need to modify the HBaseConfiguration can be.
Use the SBT to load dependencies in the development environment
  Name: = "SparkLearn" 

  Version: = "1.0" 

  ScalaVersion: = "2.10.4" 

  LibraryDependencies + = "org.apache.spark" %% "spark-core"% "1.3.0" 

  LibraryDependencies + = "org.apache.hbase"% "hbase-client"% "1.0.0" 

  LibraryDependencies + = "org.apache.hbase"% "hbase-common"% "1.0.0" 

  LibraryDependencies + = "org.apache.hbase"% "hbase-server"% "1.0.0"

HBase CRUD operation

The new version of the API to join the Connection , HAdmin became Admin , HTable into the Table , and Admin and Table can only be obtained through Connection . Connection is a Connection , because Connection is thread-safe, so Connection use of a single case, the factory method requires a HBaseConfiguration .
  Val conf = HBaseConfiguration.create () 
  Conf.set ("hbase.zookeeper.property.clientPort", "2181") 
  Conf.set ("hbase.zookeeper.quorum", "master") 

  // Connection creation is a heavyweight work, thread safe, is the operation of the importation of hbase val conn = ConnectionFactory.createConnection (conf)

Create a table

Use Admin create and delete tables
  Val userTable = TableName.valueOf ("user") 

  // create user table val tableDescr = new HTableDescriptor (userTable) 
  TableDescr.addFamily (new HColumnDescriptor ("basic" .getBytes)) 
  Println ("Creating table` user`. ") 
  If (admin.tableExists (userTable)) { 
    Admin.disableTable (userTable) 
    Admin.deleteTable (userTable) 
  } 
  Admin.createTable (tableDescr) 
  Println ("Done!")

Insert, query, scan, delete operation

HBase operations need to create an Put object Put , Get , Delete , etc., and then call the Table method on the Table
  Try { 
    // get the user table val table = conn.getTable (userTable) 

    Try { 
      / / Ready to insert a key for the id001 data val p = new Put ("id001" .getBytes) 
      // specify the column and value for the put operation (the previous put.add method is deprecated) 
      P.addColumn ("basic" .getBytes, "name" .getBytes, "wuchong" .getBytes) 
      // submit table.put (p) 

      // query a piece of data val g = new Get ("id001" .getBytes) 
      Val result = table.get (g) 
      Val value = Bytes.toString (result.getValue ("basic" .getBytes, "name" .getBytes)) 
      Println ("GET id001:" + value) 

      // scan data val s = new Scan () 
      S.addColumn ("basic" .getBytes, "name" .getBytes) 
      Val scanner = table.getScanner (s) 

      Try { 
        For (r <- scanner) { 
          Println ("Found row:" + r) 
          Println ("Found value:" + Bytes.toString ( 
            R.getValue ("basic" .getBytes, "name" .getBytes))) 
        } 
      } Finally { 
        // fix scanner close scanner.close () 
      } 

      // delete a piece of data in a manner similar to that of val d = new Delete ("id001" .getBytes) 
      D.addColumn ("basic" .getBytes, "name" .getBytes) 
      Table.delete (d) 

    } Finally { 
      If (table! = Null) table.close () 
    } 

  } Finally { 
    Conn.close () 
  }

Spark operation HBase

Write HBase

First write data to HBase, we need to use PairRDDFunctions.saveAsHadoopDataset . Because HBase is not a file system, the saveAsHadoopFile method is useless.
def saveAsHadoopDataset(conf: JobConf): Unit
Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for that storage system
This method requires a JobConf as a parameter, similar to a configuration item, the main need to specify the output format and output table name.
Step 1: We need to create a JobConf first.
  // Define HBase configuration val conf = HBaseConfiguration.create () 
  Conf.set ("hbase.zookeeper.property.clientPort", "2181") 
  Conf.set ("hbase.zookeeper.quorum", "master") 

  // specify the output format and output the table name val jobConf = new JobConf (conf, this.getClass) 
  JobConf.setOutputFormat (classOf [TableOutputFormat]) 
  JobConf.set (TableOutputFormat.OUTPUT_TABLE, "user")
Step 2: RDD to table schema mapping in HBase table schema is generally the case:
 row cf :col_1 cf :col_2 
In Spark, we operate RDD tuples, such as (1,"lilei",14) , (2,"hanmei",18) . We need to convert RDD[(uid:Int, name:String, age:Int)] to RDD[(ImmutableBytesWritable, Put)] . So, we define a convert function to do this conversion job
  Def convert (triple: (Int, String, Int)) = { 
        Val p = new Put (Bytes.toBytes (triple._1)) 
        P.addColumn (Bytes.toBytes ("basic"), Bytes.toBytes ("name"), Bytes.toBytes (triple._2)) 
        P.addColumn (Bytes.toBytes ("basic"), Bytes.toBytes ("age"), Bytes.toBytes (triple._3)) 
        (New ImmutableBytesWritable, p) 
  }
Step 3: Read RDD and convert
  // read RDD data from somewhere and convert 
  Val rawData = List (1, "lilei", 14), (2, "hanmei", 18), (3, "someone", 38) 
  Val localData = sc.parallelize (rawData) .map (convert)
Step 4: Write HBase using the saveAsHadoopDataset method
  LocalData.saveAsHadoopDataset (jobConf)

Read HBase

Spark reads HBase, we use the newAPIHadoopRDD API provided by newAPIHadoopRDD to load the contents of the table into Spark as RDDs.
  Val conf = HBaseConfiguration .create ()
 Conf.set ( "hbase.zookeeper.property.clientPort" , "2181" )
 Conf.set ( "hbase.zookeeper.quorum" , "master" )

 // set the table name of the query
 Conf.set ( TableInputFormat . INPUT_TABLE , "user" )

 Val usersRDD = sc.newAPIHadoopRDD (conf, classOf [ TableInputFormat ],
 ClassOf [org.apache.hadoop.hbase.io. ImmutableBytesWritable ],
 ClassOf [org.apache.hadoop.hbase.client. Result ])

 Val count = usersRDD.count ()
 Println ( "Users RDD Count:" + count)
 UsersRDD.cache ()

 // traverse the output
 UsersRDD.foreach { case (_, result) =>
 Val key = Bytes .toInt (result.getRow)
 Val name = Bytes .toString (result.getValue ( "basic" .getBytes, "name" .getBytes))
 Val age = Bytes .toInt (result.getValue ( "basic" .getBytes, "age" .getBytes))
 Println ( "Row key:" + key + "Name:" + name + "Age:" + age)
 }

appendix

More complete code has been uploaded to Gist.
From: http: //www.wuchong.me/blog/2015/04/06/spark-on-hbase-new-api/

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization