Read/Write data from Hbase using Spark
Apache spark is open source distributed computing framework for processing large dataset. Spark mainly work on concept of RDD.
RDD is Immutable , Distributed collection of Objects partitioned across cluster.
Two types of operation that can be performed on RDD .
1. Transformation
2. Action
Today we will see how we can use these transformation and action on Hbase data.Lets take traditional 'Employee' example.
Pre-requisite : Make sure Spark,Hbase and Hadoop are functioning properly.
We will be running following list of operation in Hbase data using Spark.
Query : Filter those records from Hbase data whose Age is less than 55.
Expected Output : I should get two records with age 53.
Below is sample code for above example:
RDD is Immutable , Distributed collection of Objects partitioned across cluster.
Two types of operation that can be performed on RDD .
1. Transformation
2. Action
Today we will see how we can use these transformation and action on Hbase data.Lets take traditional 'Employee' example.
Pre-requisite : Make sure Spark,Hbase and Hadoop are functioning properly.
We will be running following list of operation in Hbase data using Spark.
- Create Table in Hbase.
- Write data in Hbase.
- Read data from Hbase
- Apply Filter operation on Hbase data
- Store result into HDFS.
Query : Filter those records from Hbase data whose Age is less than 55.
Expected Output : I should get two records with age 53.
Below is sample code for above example:
package com.td.SparkHbaseDemo.SparkHBaseExample import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.HColumnDescriptor import org.apache.spark._ import org.apache.hadoop.hbase.util.Bytes object HBaseTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HBaseEmployeeDemo") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val tableName = "Employee" //Configuration setting for getting hbase data conf.set(TableInputFormat.INPUT_TABLE, tableName) conf.set("zookeeper.znode.parent ","/hbase-unsecure") conf.addResource("/etc/hbase/conf/hbase-site.xml"); //Check if table present val hbaseAdmin = new HBaseAdmin(conf) if(!hbaseAdmin.isTableAvailable(tableName)) { val tableDesc = new HTableDescriptor(tableName) tableDesc.addFamily(new HColumnDescriptor("employeeInformation".getBytes())); hbaseAdmin.createTable(tableDesc) } //Insert Data into Table val empTable = new HTable(conf, tableName) var empRecord1 = new Put(new String("1").getBytes()); empRecord1.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Shashi").getBytes()); empRecord1.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("53").getBytes()); empRecord1.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("Chief Architect").getBytes()); empTable.put(empRecord1); var empRecord2 = new Put(new String("2").getBytes()); empRecord2.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Ankit").getBytes()); empRecord2.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("56").getBytes()); empRecord2.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("Data Architect").getBytes()); empTable.put(empRecord2); var empRecord3 = new Put(new String("3").getBytes()); empRecord3.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Jitu").getBytes()); empRecord3.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("65").getBytes()); empRecord3.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("CEO").getBytes()); empTable.put(empRecord3); var empRecord4 = new Put(new String("4").getBytes()); empRecord4.add("employeeInformation".getBytes(), "employeeName".getBytes(), new String("Chhaaaaaaya").getBytes()); empRecord4.add("employeeInformation".getBytes(), "employeeAge".getBytes(), new String("53").getBytes()); empRecord4.add("employeeInformation".getBytes(), "employeeDesignation".getBytes(), new String("Chief Architect").getBytes()); empTable.put(empRecord4); empTable.flushCommits(); val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) //create RDD of result val resultRDD =hBaseRDD.map(x => x._2 ) //read individual column information from RDD val employeeRDD = resultRDD.map(result => ((Bytes.toString(result.getValue(Bytes.toBytes("employeeInformation"), Bytes.toBytes("employeeName")))), (Bytes.toString(result.getValue(Bytes.toBytes("employeeInformation"), Bytes.toBytes("employeeDesignation")))), Bytes.toString(result.getValue(Bytes.toBytes("employeeInformation"), Bytes.toBytes("employeeAge"))))) //Filter record from rdd val filteredAge = employeeRDD.filter(result => result._3.toDouble < 55) //Save output to Hadoop filteredAge.saveAsTextFile("/user/root/spark/HbaseFinalOut") sc.stop() } }
I typically use Ecplise for packaging code. Once you successfully
compiled and packaged your code then transfer it on cluster where you
spark running and use spark submit to start code execution.
spark-submit --class com.td.SparkHbaseDemo.SparkHBaseExample.HBaseTest SparkHBaseExample-0.0.1-SNAPSHOT-jar-with-dependencies.jar
Once spark job is complete , verify '/user/root/spark/HbaseFinalOut' has output of your code.
Looking Great one,This article content is too Good.
RépondreSupprimerKeep sharing more posts on Hadoop.
Big data hadoop course
Big data hadoop certification