Use Spark to read the data in HBase

In the "Spark read the data in the Hbase" article I introduced how to read the data in the Spark in the Spark , and provides two versions of Java and Scala implementation, this article will be described later how to calculate through the Spark Of the data stored in the Hbase . Spark built-in provides two methods can be written to the data Hbase: (1), saveAsHadoopDataset; (2), saveAsNewAPIHadoopDataset, their official introduction are as follows:
saveAsHadoopDataset : Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for that storage system. The JobConf should set an OutputFormat and any output paths required (eg a table name to write to) in the same way as it would Be configured for a Hadoop MapReduce job.
saveAsNewAPIHadoopDataset : Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop Configuration object for that storage system. The Conf should set an OutputFormat and any output paths required (eg a table name to write to) in the same Way as it would be configured for a Hadoop MapReduce job.
You can see that these two APIs are mapred for mapred and mapreduce . This article will provide the mapreduce code for both versions. Before we write the code, we first introduce the dependency in the pom.xml file:
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>0.9.1</version>
</dependency>
 
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>
 
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>
 
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>
 
<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

SaveAsHadoopDataset

package com.qunar.bigdata.hbase

import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 2016-11-29
 Time: 22:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1892
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////

object SparkToHBase {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: SparkToHBase <input file>")
      System.exit(1)
    }

    val conf = new SparkConf().setAppName("SparkToHBase")
    val sc = new SparkContext(conf)

    val input = sc.textFile(args(0))

    //创建HBase配置
    val hConf = HBaseConfiguration.create()
    hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")

    //创建JobConf,设置输出格式和表名
    val jobConf = new JobConf(hConf, this.getClass)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")

    val data = input.map { item =>
      val Array(key, value) = item.split("\t")
      val rowKey = key.reverse
      val put = new Put(Bytes.toBytes(rowKey))
      put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))
      (new ImmutableBytesWritable, put)
    }
    //保存到HBase表
    data.saveAsHadoopDataset(jobConf)
    sc.stop()
  }
}
We enter the data format is:
0015A49A8F2A60DACEE0160545C58F94    1234
0152C9666B5F3426DDDB5FB74BDBCE4F    4366
0160D90AC268AEB595208E8448C7F8B8    6577
0225A39EB29BDB582CA58BE86791ACBC    1234
02462ACDF7232C49890B07D63B50C5E1    4366
030730EBE05740C992840525E35BC8AD    7577
038A459BC05F3B655F5655C810E76352    7577
0417D3FD71458C4BAD1E5AFDE7259930    7577
042CD42B657C46D0D4E5CC69AFDD7E54    7577
051069378849ACF97BFAD09D3A9C7702    7577
05E833C9C763A98323E0328DA0A31039    7577
060E206514A24D944305D370F615F8E9    7577
087E8488796C29E1C8239565666CE2D7    7577
09A425F1DD240A7150ECEFAA0BFF25FA    7577
0B27E3CB5F3F32EB3715DB8E2D333BED    7577
0B27E82A4CEE73BBB98438DFB0DB2FFE    7577
0BAEEB7A12DCEF20EE26D7A030164DFF    7577
0C5BFC45F64907A61ECB1C892F98525C    7577
0C74F2FFD1BB3598BC8DB10C37DBA6B4    7577
0C9CEE40DDD961C7D2BBE0491FDF92A8    7577
0CC578371622F932287EB81065F81F5F    7577
0D6B03EFDAE7165A0F7CC79EABEAC0D3    7577
0DF7B014187A9AB2F1049781592CC053    7577
0E67D8ABDB3749D58207A7B45FEA7F12    7577
0E866677E79A7843E0EDCF2BE0141911    7577
0EAF4A69BA3BF05E8EA75CC1287304A3    7577
0EE2969AE674DF5F8944B5EA2E97DBEC    7577
0FAA253D53BC6D831CF6E742147C3BED    7577
0FB92AC3DE664BFF40D334DA8EE97B85    7577
The first column will be stored as HBase's Rowkey, and the second column is the value of info.

SaveAsNewAPIHadoopDataset

package com.qunar.bigdata.hbase

import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkContext, SparkConf}


object SparkToHBaseNew {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: SparkToHBaseNew <input file>")
      System.exit(1)
    }
    val conf = new SparkConf().setAppName("SparkToHBaseNew")
    val sc = new SparkContext(conf)
    val input = sc.textFile(args(0))
    val hConf = HBaseConfiguration.create()
    hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")
    val jobConf = new JobConf(hConf, this.getClass)
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")
    //设置job的输出格式
    val job = Job.getInstance(jobConf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    val data = input.map { item =>
      val Array(key, value) = item.split("\t")
      val rowKey = key.reverse
      val put = new Put(Bytes.toBytes(rowKey))
      put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))
      (new ImmutableBytesWritable, put)
    }
    //保存到HBase表
    data.saveAsNewAPIHadoopDataset(job.getConfiguration)
    sc.stop()
  }
}
This method and the first is almost the same, we can choose according to their own use of one of them. But the above data will be written to the Spark Hbase or a little long-winded, I will be described later how to RDD in the data directly written to the hbase, similar to saveToHbase , welcome to everyone's saveToHbase .

Commentaires

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization