Use Spark to read the data in HBase
In the "Spark read the data in the Hbase" article I introduced how to read the data in the Spark in the Spark , and provides two versions of Java and Scala implementation, this article will be described later how to calculate through the Spark Of the data stored in the Hbase .
Spark built-in provides two methods can be written to the data Hbase:
(1), saveAsHadoopDataset; (2), saveAsNewAPIHadoopDataset, their official
introduction are as follows:
You can see that these two APIs are
saveAsHadoopDataset
: Output the RDD to any Hadoop-supported storage system, using a Hadoop
JobConf object for that storage system. The JobConf should set an
OutputFormat and any output paths required (eg a table name to write to)
in the same way as it would Be configured for a Hadoop MapReduce job. saveAsNewAPIHadoopDataset
: Output the RDD to any Hadoop-supported storage system with new Hadoop
API, using a Hadoop Configuration object for that storage system. The
Conf should set an OutputFormat and any output paths required (eg a
table name to write to) in the same Way as it would be configured for a
Hadoop MapReduce job. You can see that these two APIs are
mapred
for mapred
and mapreduce
. This article will provide the mapreduce
code for both versions. Before we write the code, we first introduce the dependency in the pom.xml file: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>0.9.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>0.98.2-hadoop2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>0.98.2-hadoop2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>0.98.2-hadoop2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>0.98.2-hadoop2</version> </dependency>
SaveAsHadoopDataset
package com.qunar.bigdata.hbase import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration} import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkConf, SparkContext} ///////////////////////////////////////////////////////////////////// User: 过往记忆 Date: 2016-11-29 Time: 22:59 bolg: https://www.iteblog.com 本文地址:https://www.iteblog.com/archives/1892 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 过往记忆博客微信公共帐号:iteblog_hadoop ///////////////////////////////////////////////////////////////////// object SparkToHBase { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: SparkToHBase <input file>") System.exit(1) } val conf = new SparkConf().setAppName("SparkToHBase") val sc = new SparkContext(conf) val input = sc.textFile(args(0)) //创建HBase配置 val hConf = HBaseConfiguration.create() hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181") //创建JobConf,设置输出格式和表名 val jobConf = new JobConf(hConf, this.getClass) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog") val data = input.map { item => val Array(key, value) = item.split("\t") val rowKey = key.reverse val put = new Put(Bytes.toBytes(rowKey)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value)) (new ImmutableBytesWritable, put) } //保存到HBase表 data.saveAsHadoopDataset(jobConf) sc.stop() } }We enter the data format is:
0015A49A8F2A60DACEE0160545C58F94 1234 0152C9666B5F3426DDDB5FB74BDBCE4F 4366 0160D90AC268AEB595208E8448C7F8B8 6577 0225A39EB29BDB582CA58BE86791ACBC 1234 02462ACDF7232C49890B07D63B50C5E1 4366 030730EBE05740C992840525E35BC8AD 7577 038A459BC05F3B655F5655C810E76352 7577 0417D3FD71458C4BAD1E5AFDE7259930 7577 042CD42B657C46D0D4E5CC69AFDD7E54 7577 051069378849ACF97BFAD09D3A9C7702 7577 05E833C9C763A98323E0328DA0A31039 7577 060E206514A24D944305D370F615F8E9 7577 087E8488796C29E1C8239565666CE2D7 7577 09A425F1DD240A7150ECEFAA0BFF25FA 7577 0B27E3CB5F3F32EB3715DB8E2D333BED 7577 0B27E82A4CEE73BBB98438DFB0DB2FFE 7577 0BAEEB7A12DCEF20EE26D7A030164DFF 7577 0C5BFC45F64907A61ECB1C892F98525C 7577 0C74F2FFD1BB3598BC8DB10C37DBA6B4 7577 0C9CEE40DDD961C7D2BBE0491FDF92A8 7577 0CC578371622F932287EB81065F81F5F 7577 0D6B03EFDAE7165A0F7CC79EABEAC0D3 7577 0DF7B014187A9AB2F1049781592CC053 7577 0E67D8ABDB3749D58207A7B45FEA7F12 7577 0E866677E79A7843E0EDCF2BE0141911 7577 0EAF4A69BA3BF05E8EA75CC1287304A3 7577 0EE2969AE674DF5F8944B5EA2E97DBEC 7577 0FAA253D53BC6D831CF6E742147C3BED 7577 0FB92AC3DE664BFF40D334DA8EE97B85 7577The first column will be stored as HBase's Rowkey, and the second column is the value of info.
SaveAsNewAPIHadoopDataset
package com.qunar.bigdata.hbase import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration} import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkContext, SparkConf} object SparkToHBaseNew { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: SparkToHBaseNew <input file>") System.exit(1) } val conf = new SparkConf().setAppName("SparkToHBaseNew") val sc = new SparkContext(conf) val input = sc.textFile(args(0)) val hConf = HBaseConfiguration.create() hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181") val jobConf = new JobConf(hConf, this.getClass) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog") //设置job的输出格式 val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val data = input.map { item => val Array(key, value) = item.split("\t") val rowKey = key.reverse val put = new Put(Bytes.toBytes(rowKey)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value)) (new ImmutableBytesWritable, put) } //保存到HBase表 data.saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() } }This method and the first is almost the same, we can choose according to their own use of one of them. But the above data will be written to the Spark Hbase or a little long-winded, I will be described later how to RDD in the data directly written to the hbase, similar to
saveToHbase
, welcome to everyone's saveToHbase
.
Too good article,Thank you for sharing it.
RépondreSupprimerKeep updating..
Big data training
Big data online course