Spark doBulkLoad hbase

  1. package com.hun.scala  
  2.   
  3. import java.text.SimpleDateFormat  
  4. import java.util.Date  
  5.   
  6. import org.apache.commons.codec.digest.DigestUtils  
  7. import org.apache.hadoop.fs.Path  
  8. import org.apache.hadoop.hbase.client.{HTable, Table, _}  
  9. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  10. import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}  
  11. import org.apache.hadoop.hbase.util.Bytes  
  12. import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}  
  13. import org.apache.hadoop.mapreduce.Job  
  14. import org.apache.spark.{SparkConf, SparkContext}  
  15.   
  16. /**  
  17.   * Created by hun on 2016/10/17.  
  18.   *  
  19.   * 终于成功了  
  20.   *  
  21.   */  
  22. object Test6 {  
  23.   
  24.   def main(args: Array[String]) = {  
  25.   
  26.     //创建sparkcontext,用默认的配置  
  27.     //val sc = new SparkContext(new SparkConf())  
  28.     val sc = new SparkContext("local", "app name")  
  29.     //hbase的列族  
  30.     val columnFamily1 = "f1"  
  31.     val conf = HBaseConfiguration.create()  
  32.     conf.set("hbase.zookeeper.property.clientPort", "2181")  
  33.     conf.set("hbase.zookeeper.quorum", "120.27.111.55")  
  34.   
  35.     val res1=sc.textFile("file:///E:/BaiduYunDownload/data1").map(x =>  
  36.       x.replaceAll("<|>", "")  
  37.     ).distinct();  
  38.     val res2=res1.filter(x=>  
  39.       x.contains("REC")  
  40.     )  
  41.   
  42.    val sourceRDDres2.flatMap(x=>  
  43.     {  
  44.       val arg0=x.split(",")  
  45.       val arg1=arg0.map(y=>  
  46.       y.replaceFirst("=",",")  
  47.       ).filter(s=>  
  48.         s.split(",").length>1  
  49.         )  
  50.       //arg0(10).replaceFirst("=",",").split(",")(0).contat(arg0(10).replaceFirst("=",",").split(",")(0))  
  51.      // val key1=Bytes.toBytes(arg0(11).replaceFirst("=",",").split(",")(0).concat(arg0(17).replaceFirst("=",",").split(",")(1)));  
  52.       val sdf = new SimpleDateFormat("yyyyMMdd")  
  53.       val date=(Long.MaxValue-sdf.parse(arg0(11).replaceFirst("=",",").split(",")(1)).getTime).toString  
  54.       val key=DigestUtils.md5Hex(date).concat(arg0(17).replaceFirst("=",",").split(",")(1));  
  55.       // println(arg0(11).replaceFirst("=",",").split(",")(1).concat(arg0(17).replaceFirst("=",",").split(",")(1)))  
  56.   
  57.      val arg2=arg1.map(z=>  
  58.         (key,(columnFamily1,z.split(",")(0), z.split(",")(1)))  
  59.       ).sorted  
  60.   
  61.       arg2  
  62.      // arg0.  
  63.     }  
  64.     )  
  65.     val source=sourceRDD.sortBy(_._1)  
  66.     source.foreach(println)  
  67.     val date = new Date().getTime  
  68.     val rdd = source.map(x => {  
  69.       //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key  
  70.       //KeyValue的实例为value  
  71.       //rowkey  
  72.       val rowKey = x._1  
  73.       val family = x._2._1  
  74.       val colum = x._2._2  
  75.       val value = x._2._3  
  76.       (new ImmutableBytesWritable(Bytes.toBytes(rowKey)), new KeyValue(Bytes.toBytes(rowKey), Bytes.toBytes(family), Bytes.toBytes(colum), date,Bytes.toBytes(value)))  
  77.     })  
  78.   
  79.     rdd.foreach(print)  
  80.   
  81.     //生成的HFile的临时保存路径  
  82.     val stagingFolder = "file:///E:/BaiduYunDownload/data12"  
  83.     //将日志保存到指定目录  
  84.   
  85.     rdd.saveAsNewAPIHadoopFile(stagingFolder,  
  86.       classOf[ImmutableBytesWritable],  
  87.       classOf[KeyValue],  
  88.       classOf[HFileOutputFormat2],  
  89.       conf)  
  90.     //此处运行完成之后,在stagingFolder会有我们生成的Hfile文件  
  91.   
  92.   
  93.     //开始即那个HFile导入到Hbase,此处都是hbase的api操作  
  94.     val load = new LoadIncrementalHFiles(conf)  
  95.     //hbase的表名  
  96.     val tableName = "output_table"  
  97.     //创建hbase的链接,利用默认的配置文件,实际上读取的hbase的master地址  
  98.     val conn = ConnectionFactory.createConnection(conf)  
  99.     //根据表名获取表  
  100.     val table: Table = conn.getTable(TableName.valueOf(tableName))  
  101.     //print(table.getTableDescriptor()+"eeeeeeeeeeeeeeeeeeeeeeeeeeeeee")  
  102.     try {  
  103.       //获取hbase表的region分布  
  104.       // val regionLocator = conn.getRegionLocator(TableName.valueOf(tableName))  
  105.       //创建一个hadoop的mapreduce的job  
  106.       val job = Job.getInstance(conf)  
  107.       //设置job名称  
  108.       job.setJobName("DumpFile")  
  109.       //此处最重要,需要设置文件输出的key,因为我们要生成HFil,所以outkey要用ImmutableBytesWritable  
  110.       job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])  
  111.       //输出文件的内容KeyValue  
  112.       job.setMapOutputValueClass(classOf[KeyValue])  
  113.       //配置HFileOutputFormat2的信息  
  114.       //HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)  
  115.       HFileOutputFormat2.configureIncrementalLoadMap(job, table)  
  116.   
  117.       //开始导入  
  118.       load.doBulkLoad(new Path(stagingFolder), table.asInstanceOf[HTable])  
  119.     } finally {  
  120.       table.close()  
  121.       conn.close()  
  122.     }  
  123.   }  
  124.   
  125. }  


  126. For data sources, Load is an error.

    Are generally:

    Java.io.IOException: Added a key not lexically larger than previous key = \ x00 \ x02Mi \ x0BsearchIndexuserId \ x00 \ x00 \ xD5 \ xD6 \ xF3 \ xA3 \ x04, lastkey = \ x00 \ x01w \ x0BsearchIndexuserId \ x00 \ X00 \ x01> \ xD5 \ xD6 \ xF3 \ xA3 \ x04


    The main reason, in the production of HFile file, be sure to sort the primary key. Put will be automatically sorted. But made their own HFile files will not automatically sort.

    All must be sorted well from

    Primary key

    Family

    Column

    Have to be sorted manually. Then generate the HFile file. Otherwise only the

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization