Spark reads HDFS into HBase (1.0.0 new API)
Spark reads HDFS into HBase (1.0.0 new API)
HBase after seven years of development, and finally at the end of February this year, released 1.0.0 version. This version provides some exciting features, and, without sacrificing stability, the introduction of a new API. Although 1.0.0 compatible with the old version of the API, but still should be familiar with the new API as soon as possible. And understand how to combine with the current red Spark, the data written and read. In view of the domestic and foreign information on the new API HBase 1.0.0 little, so this article.
This article will be introduced in two parts, the first part of the new version of the API to explain the use of CRUD basic operation; the second part of the Spark to explain how to write RDDs HBase table, on the contrary, HBase table is how to RDDs loaded Spark inside.
This article will be introduced in two parts, the first part of the new version of the API to explain the use of CRUD basic operation; the second part of the Spark to explain how to write RDDs HBase table, on the contrary, HBase table is how to RDDs loaded Spark inside.
Environment configuration
In order to avoid unnecessary inconsistencies in the version of the trouble, API and HBase environment are 1.0.0 version.
HBase is a stand-alone mode, distributed mode is similar to the use of
only need to modify the configuration of HBaseConfiguration can be.
Use the SBT to load dependencies in the development environment
Use the SBT to load dependencies in the development environment
name := "SparkLearn"version := "1.0"scalaVersion := "2.10.4"libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.0.0"libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.0.0"libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.0.0
HBase 1.0 changes introduced
The new version of the API to join the Connection, HAdmin became Admin,
HTable into the Table, and Admin and Table can only be obtained through
Connection.
Connection is a heavyweight operation, because Connection is thread-safe, so recommend the use of a single case, the factory method requires a HBaseConfiguration.
Connection is a heavyweight operation, because Connection is thread-safe, so recommend the use of a single case, the factory method requires a HBaseConfiguration.
//通过工厂创建connectionConnection connection = ConnectionFactory.createConnection(config);connection.close();//表名String tableName = "Table";TableName tableNameObj = TableName.valueOf(tableName);//通过connection获取Table table = connection.getTable(tableNameObj);BufferedMutator mutator = connection.getBufferedMutator(tableNameObj);RegionLocator regionLocator = connection.getRegionLocator(tableNameObj);Admin admin = connection.getAdmin();
Specific process
First write data to HBase, we need to use PairRDDFunctions.saveAsHadoopDataset. Because HBase is not a file system, the saveAsHadoopFile method is useless.
Def saveAsHadoopDataset (conf: JobConf): Unit
This method requires a JobConf as a parameter, similar to a configuration item, the main need to specify the output format and output table name.
Def saveAsHadoopDataset (conf: JobConf): Unit
This method requires a JobConf as a parameter, similar to a configuration item, the main need to specify the output format and output table name.
Step1: Read HDFS file contents
file format:
file format:
Code:
val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local") val sc = new SparkContext(sparkConf) val readtxt=sc.textFile("hdfs://127.0.0.1:8020/user/hdfs/from_mysql/recommend_base_data_day/data_20160222") val rddpro=readtxt.map(_.split(";")).map(p=>(p(0),p(1),p(2),p(3),p(4),p(5),p(6),try{p(7)} catch{ case ex:ArrayIndexOutOfBoundsException=>{""} }))
Step2: HBase create table
val conf = HBaseConfiguration.create()conf.set("hbase.zookeeper.quorum", "127.0.0.1")conf.set("hbase.zookeeper.property.clientPort", "2181")conf.set("hbase.defaults.for.version.skip", "true") // conf.set(TableInputFormat.INPUT_TABLE, "b_user")var conn=ConnectionFactory.createConnection(conf)var admin=conn.getAdmin //得到admin(操作数据库)var tabname=TableName.valueOf("recommend_base_data_day")if(admin.tableExists(tabname)){ admin.disableTable(tabname) admin.deleteTable(tabname) }var descrip=new HTableDescriptor(tabname)descrip.addFamily(new HColumnDescriptor("info".getBytes()))//创建名为info的列族admin.createTable(descrip)
Step3: HBase configuration input table
val jobconf=new JobConf(conf,this.getClass)jobconf.setOutputFormat(classOf[TableOutputFormat])jobconf.set(TableOutputFormat.OUTPUT_TABLE,"recommend_base_data_day")
Step4: data processing function
def convert(triple: (String, String, String,String, String, String,String, String)) = { val p = new Put(Bytes.toBytes(triple._1)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("platform_id"),Bytes.toBytes(triple._2)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("dt"),Bytes.toBytes(triple._3)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("user_id"),Bytes.toBytes(triple._4)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("user_ip"),Bytes.toBytes(triple._5)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("goods_name"),Bytes.toBytes(triple._6)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("total_num"),Bytes.toBytes(triple._7)) p.addColumn(Bytes.toBytes("info"),Bytes.toBytes("etl_insert"),Bytes.toBytes(triple._8)) (new ImmutableBytesWritable, p) }
Step5: Execute and view the results
val localData =rddpro.map(convert)localData.saveAsHadoopDataset(jobconf)
The main content reproduced in: http: //www.wuchong.me/blog/2015/04/06/spark-on-hbase-new-api/
Awesome Information Keep Rocking Big data hadoop online Course India
RépondreSupprimerVery nice blog,keep sharing more posts with us.
RépondreSupprimerbig data online training