Read/Write data from Hbase using Spark

Apache spark is open source distributed computing framework for processing large dataset. Spark mainly work on concept of RDD.

RDD is  Immutable , Distributed collection of Objects partitioned across cluster.

Two types of operation that can be performed on RDD .

1. Transformation
2. Action

Today we will see how we can use these transformation and action on Hbase data.Lets take traditional 'Employee' example.

Pre-requisite : Make sure Spark,Hbase and Hadoop are functioning properly.

We will be running following list of operation in Hbase data using Spark.

  1.  Create Table in Hbase.
  2. Write data in Hbase. 
  3. Read data from Hbase
  4. Apply Filter operation on Hbase data
  5. Store result into HDFS.
Just to give high level overview , I will write employee information in Hbase and filter those employee records whose age is less than 55.

 Query : Filter those records from Hbase data whose Age is less than 55.
Expected Output : I should get two records with age 53.

Below is sample code for above example:

package com.td.SparkHbaseDemo.SparkHBaseExample

import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.spark._
import org.apache.hadoop.hbase.util.Bytes

object HBaseTest {
 def main(args: Array[String]) {

  val sparkConf = new SparkConf().setAppName("HBaseEmployeeDemo")
    val sc = new SparkContext(sparkConf)

    val conf = HBaseConfiguration.create()
    val tableName = "Employee"

    //Configuration setting for getting hbase data
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    conf.set("zookeeper.znode.parent ","/hbase-unsecure")
    conf.addResource("/etc/hbase/conf/hbase-site.xml");

  //Check if table present
  val hbaseAdmin = new HBaseAdmin(conf)
    if(!hbaseAdmin.isTableAvailable(tableName)) 
    {
     val tableDesc = new HTableDescriptor(tableName)
       tableDesc.addFamily(new HColumnDescriptor("employeeInformation".getBytes()));
     hbaseAdmin.createTable(tableDesc)
    }

  //Insert Data into Table 

  val empTable = new HTable(conf, tableName)

  var empRecord1 = new Put(new String("1").getBytes());
  empRecord1.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Shashi").getBytes());
  empRecord1.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("53").getBytes());
  empRecord1.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("Chief Architect").getBytes());
  empTable.put(empRecord1);

  var empRecord2 = new Put(new String("2").getBytes());
  empRecord2.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Ankit").getBytes());
  empRecord2.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("56").getBytes());
  empRecord2.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("Data Architect").getBytes());
  empTable.put(empRecord2);

  var empRecord3 = new Put(new String("3").getBytes());
  empRecord3.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Jitu").getBytes());
  empRecord3.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("65").getBytes());
  empRecord3.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("CEO").getBytes());
  empTable.put(empRecord3);

  var empRecord4 = new Put(new String("4").getBytes());
  empRecord4.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Chhaaaaaaya").getBytes());
  empRecord4.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("53").getBytes());
  empRecord4.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("Chief Architect").getBytes());
  empTable.put(empRecord4);


  empTable.flushCommits();

  val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])

    //create RDD of result
    val resultRDD =hBaseRDD.map(x => x._2 )

    //read individual column information from RDD
    val employeeRDD = resultRDD.map(result => ((Bytes.toString(result.getValue(Bytes.toBytes("employeeInformation"), Bytes.toBytes("employeeName")))),
      (Bytes.toString(result.getValue(Bytes.toBytes("employeeInformation"), Bytes.toBytes("employeeDesignation")))),
      Bytes.toString(result.getValue(Bytes.toBytes("employeeInformation"), Bytes.toBytes("employeeAge")))))
    
    //Filter record from rdd  
    val filteredAge = employeeRDD.filter(result => result._3.toDouble < 55)
    
    //Save output to Hadoop
    filteredAge.saveAsTextFile("/user/root/spark/HbaseFinalOut")

    sc.stop()
 }
}


I typically use Ecplise for packaging code. Once you successfully 
compiled and packaged  your code then transfer it on cluster where you 
spark running and use spark submit to start code execution.
 
spark-submit --class com.td.SparkHbaseDemo.SparkHBaseExample.HBaseTest SparkHBaseExample-0.0.1-SNAPSHOT-jar-with-dependencies.jar
 
 
Once spark job is complete , verify '/user/root/spark/HbaseFinalOut' has output of your code.
 
 

Commentaires

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization