Spark reads the data in Hbase

If you want to keep abreast of Spark , Hadoop or Hbase related articles, please pay attention to WeChat public account: iteblog_hadoop
We may know that are familiar with Spark two common data read (stored in the RDD): (1), call the parallelize function directly from the collection to obtain data and stored in the RDD; Java version is as follows:
JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3));
Scala version is as follows:
val myRDD= sc.parallelize(List(1,2,3))
This is a simple and easy way to turn the data from one set into the RDD initialization value; more often (2), read the data from the text into the RDD, the text can be a plain text file, Is a sequence file; can be stored locally (file: //), can be stored in HDFS (hdfs: //), can also be stored on the S3. In fact, for the file, Spark support Hadoop support all the file types and file storage location. The Java version is as follows:
/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 14-6-29
 Time: 23:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1051
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.addFile("wyp.data");
JavaRDD<String> lines = sc.textFile(SparkFiles.get("wyp.data"));
Scala version is as follows:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
sc.addFile("spam.data")
val inFile = sc.textFile(SparkFiles.get("spam.data"))
In actual circumstances, we need the data may not be simply stored in the HDFS text, we need the data may be stored in the Hbase , then how do we use Spark to read the data in the Hbase? All of the tests in this article are based on Hadoop 2.2.0, Hbase 0.98.2, Spark 0.9.1, and the preparation of different versions of possible code is somewhat different. This article is simply to use Spark to read the data in the Hbase, if you need to Hbase for more powerful operation, this article may not help you. Not much to say, Spark operation Hbase core Java version code is as follows:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 14-6-29
 Time: 23:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1051
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////

JavaSparkContext sc = new JavaSparkContext(master, "hbaseTest",
                System.getenv("SPARK_HOME"), System.getenv("JARS"));

Configuration conf = HBaseConfiguration.create();
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf"));
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("airName"));

try {
        String tableName = "flight_wap_order_log";
        conf.set(TableInputFormat.INPUT_TABLE, tableName);
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String ScanToString = Base64.encodeBytes(proto.toByteArray());
        conf.set(TableInputFormat.SCAN, ScanToString);

        JavaPairRDD<ImmutableBytesWritable, Result> myRDD = 
                sc.newAPIHadoopRDD(conf,  TableInputFormat.class, 
                ImmutableBytesWritable.class, Result.class);

catch (Exception e) {
            e.printStackTrace();
}
In this section, the code segment reads the data of the airName column on the cf cluster from the database named Flight_wap_order_log, so that we can do the same for myRDD:
System.out.println(myRDD.count());
This code needs to be added to the following pom.xml file:
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>0.9.1</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-common</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>

<dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>0.98.2-hadoop2</version>
</dependency>
Scala version is as follows:
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

/////////////////////////////////////////////////////////////////////
 User: 过往记忆
 Date: 14-6-29
 Time: 23:59
 bolg: https://www.iteblog.com
 本文地址:https://www.iteblog.com/archives/1051
 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
 过往记忆博客微信公共帐号:iteblog_hadoop
/////////////////////////////////////////////////////////////////////

object HBaseTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(args(0), "HBaseTest",
      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))

    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, args(1))

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    hBaseRDD.count()

    System.exit(0)
  }
}
We need to rely on the following:
libraryDependencies ++= Seq(
        "org.apache.spark" % "spark-core_2.10" % "0.9.1",
        "org.apache.hbase" % "hbase" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2",
        "org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2"
)
In the test, you need to configure the Hbase, Hadoop environment, otherwise the program will be a problem, in particular, let the program find Hbase-site.xml configuration file.

Commentaires

Enregistrer un commentaire

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization