Spark reads HDFS into HBase (1.0.0 new API)

Spark reads HDFS into HBase (1.0.0 new API)

HBase after seven years of development, and finally at the end of February this year, released 1.0.0 version. This version provides some exciting features, and, without sacrificing stability, the introduction of a new API. Although 1.0.0 compatible with the old version of the API, but still should be familiar with the new API as soon as possible. And understand how to combine with the current red Spark, the data written and read. In view of the domestic and foreign information on the new API HBase 1.0.0 little, so this article.
This article will be introduced in two parts, the first part of the new version of the API to explain the use of CRUD basic operation; the second part of the Spark to explain how to write RDDs HBase table, on the contrary, HBase table is how to RDDs loaded Spark inside.

Environment configuration

In order to avoid unnecessary inconsistencies in the version of the trouble, API and HBase environment are 1.0.0 version. HBase is a stand-alone mode, distributed mode is similar to the use of only need to modify the configuration of HBaseConfiguration can be.
Use the SBT to load dependencies in the development environment
 name := "SparkLearn"version := "1.0"scalaVersion := "2.10.4"libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.0.0"libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.0.0"libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.0.0 


HBase 1.0 changes introduced

The new version of the API to join the Connection, HAdmin became Admin, HTable into the Table, and Admin and Table can only be obtained through Connection.
Connection is a heavyweight operation, because Connection is thread-safe, so recommend the use of a single case, the factory method requires a HBaseConfiguration.
 //通过工厂创建connectionConnection connection = ConnectionFactory.createConnection(config);connection.close();//表名String tableName = "Table";TableName tableNameObj = TableName.valueOf(tableName);//通过connection获取Table table = connection.getTable(tableNameObj);BufferedMutator mutator = connection.getBufferedMutator(tableNameObj);RegionLocator regionLocator = connection.getRegionLocator(tableNameObj);Admin admin = connection.getAdmin(); 

Specific process

First write data to HBase, we need to use PairRDDFunctions.saveAsHadoopDataset. Because HBase is not a file system, the saveAsHadoopFile method is useless.
Def saveAsHadoopDataset (conf: JobConf): Unit
This method requires a JobConf as a parameter, similar to a configuration item, the main need to specify the output format and output table name.
Step1: Read HDFS file contents
file format:
  val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local") val sc = new SparkContext(sparkConf) val readtxt=sc.textFile("hdfs://") val";")).map(p=>(p(0),p(1),p(2),p(3),p(4),p(5),p(6),try{p(7)} catch{ case ex:ArrayIndexOutOfBoundsException=>{""} })) 
Step2: HBase create table
 val conf = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "")conf.set("", "2181")conf.set("hbase.defaults.for.version.skip", "true") // conf.set(TableInputFormat.INPUT_TABLE, "b_user")var conn=ConnectionFactory.createConnection(conf)var admin=conn.getAdmin //得到admin(操作数据库)var tabname=TableName.valueOf("recommend_base_data_day")if(admin.tableExists(tabname)){ admin.disableTable(tabname) admin.deleteTable(tabname) }var descrip=new HTableDescriptor(tabname)descrip.addFamily(new HColumnDescriptor("info".getBytes()))//创建名为info的列族admin.createTable(descrip) 
Step3: HBase configuration input table
 val jobconf=new JobConf(conf,this.getClass)jobconf.setOutputFormat(classOf[TableOutputFormat])jobconf.set(TableOutputFormat.OUTPUT_TABLE,"recommend_base_data_day") 
Step4: data processing function
 def convert(triple: (String, String, String,String, String, String,String, String)) = { val p = new Put(Bytes.toBytes(triple._1)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("platform_id"),Bytes.toBytes(triple._2)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("dt"),Bytes.toBytes(triple._3)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("user_id"),Bytes.toBytes(triple._4)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("user_ip"),Bytes.toBytes(triple._5)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("goods_name"),Bytes.toBytes(triple._6)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("total_num"),Bytes.toBytes(triple._7)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("etl_insert"),Bytes.toBytes(triple._8)) (new ImmutableBytesWritable, p) } 
Step5: Execute and view the results
 val localData 
The main content reproduced in: http: //


Publier un commentaire

Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch