spark hbasefilter hbase

  1. package cn.deppon.Tool  
  2.   
  3. import java.util  
  4.   
  5. import scala.collection.JavaConverters._  
  6. import org.apache.hadoop.hbase.HBaseConfiguration  
  7. import org.apache.hadoop.hbase.client.{HTable, ResultScanner, Scan}  
  8. import org.apache.hadoop.hbase.filter._  
  9. import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
  10. import org.apache.hadoop.hbase.protobuf.ProtobufUtil  
  11. import org.apache.hadoop.hbase.util.{Base64, Bytes}  
  12. import org.apache.log4j.{Level, Logger}  
  13. import org.apache.spark.sql.{Row, SQLContext}  
  14. import org.apache.spark.sql.types._  
  15. import org.apache.spark.{SparkConf, SparkContext}  
  16.   
  17. /** 
  18.   * Created by DreamBoy on 2017/5/12. 
  19.   */  
  20. object SparkHbaseTool {  
  21.   
  22.   /** 
  23.     * 利用主构造器构造需要的环境的基本条件 
  24.     */  
  25.   Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)  
  26.   //设置spark参数  
  27.   val conf = new SparkConf().setMaster("local[2]")  
  28.     .setAppName("HbaseTest")  
  29.   conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer")  
  30.   val sc = new SparkContext(conf)  
  31.   val hbaseConf = HBaseConfiguration.create()  
  32.   val sqlContext = new SQLContext(sc)  
  33.   
  34.   //配置HBase  
  35.   hbaseConf.set("hbase.rootdir""hdfs://http://192.168.10.228/hbase")  
  36.   hbaseConf.set("hbase.zookeeper.quorum""192.168.10.228,192.168.10.229,192.168.10.230,192.168.10.231,192.168.10.232")  
  37.   hbaseConf.set("hbase.zookeeper.property.clientPort""2181")  
  38.   hbaseConf.set("hbase.master""192.168.10.230")  
  39.   
  40.   def convertScanToString(scan: Scan) = {  
  41.     val proto = ProtobufUtil.toScan(scan)  
  42.     Base64.encodeBytes(proto.toByteArray)  
  43.   }  
  44.   
  45.   /** 
  46.     * 
  47.     * @param tbl_nm     表名 
  48.     * @param show_col   _1 列族  _2列名  _3 列类型(String,Int,Double,Timestamp...) 
  49.     * @param filter_col _1 列族  _2列名  _3 筛选值  _4 筛选类型(=,<,>,!=...) 
  50.     * @return sqlcontext 
  51.     */  
  52.   def getTableNm(tbl_nm: String, show_col: Array[(String, String, String)], filter_col: Array[(String, String, String, String)]): (SQLContext) = {  
  53.   
  54.     hbaseConf.set(TableInputFormat.INPUT_TABLE, tbl_nm)  
  55.     val table = new HTable(hbaseConf, tbl_nm)  
  56.     val scan = new Scan()  
  57.   
  58.     /** 
  59.       * 指定列族和需要显示的列名 
  60.       * 添加多个需要用到的列 
  61.       */  
  62.     /* 
  63.     val length = show_col.length 
  64.     for(i <- show_col){ 
  65.       scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2)) 
  66.     } 
  67.     */  
  68.     //设置rowkey的范围,启示和结束  
  69.     //scan.setStartRow(Bytes.toBytes(""))  
  70.     //scan.setStopRow(Bytes.toBytes(""))  
  71.     val fil_len = filter_col.length  
  72.     println("------>>>>" + fil_len)  
  73.     //如果没有添加过滤器,就给过滤器添加空  
  74.     if (fil_len > 0) {  
  75.       val filter_arr = new util.ArrayList[Filter](fil_len)  
  76.   
  77.       for (i <- filter_col) {  
  78.         i._4 match {  
  79.           case "=" => {  
  80.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  81.               Bytes.toBytes(i._2), CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  82.             filter1.setFilterIfMissing(true)  
  83.             filter_arr.add(filter1)  
  84.           }  
  85.           case "<" => {  
  86.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  87.               Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes(i._3)))  
  88.             filter1.setFilterIfMissing(true)  
  89.             filter_arr.add(filter1)  
  90.           }  
  91.           case "<=" => {  
  92.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  93.               Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  94.             filter1.setFilterIfMissing(true)  
  95.             filter_arr.add(filter1)  
  96.           }  
  97.           case ">" => {  
  98.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  99.               Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(i._3)))  
  100.             filter1.setFilterIfMissing(true)  
  101.             filter_arr.add(filter1)  
  102.           }  
  103.           case ">=" => {  
  104.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  105.               Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  106.             //filter1.setFilterIfMissing(true)  
  107.             filter_arr.add(filter1)  
  108.           }  
  109.           case "!=" => {  
  110.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  111.               Bytes.toBytes(i._2), CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  112.             filter1.setFilterIfMissing(true)  
  113.             filter_arr.add(filter1)  
  114.           }  
  115.           case _ => {}  
  116.         }  
  117.       }  
  118.       /** 
  119.         * 通过使用filterlist可以加载多个过滤器 
  120.         * 设置多个过滤器 
  121.         */  
  122.       val filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filter_arr)  
  123.       scan.setFilter(filterList)  
  124.     } else {  
  125.       scan.setFilter(null)  
  126.     }  
  127.   
  128.     //hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))  
  129.     //获取表的扫描  
  130.     val ColumnValueScanner = table.getScanner(scan)  
  131.     //构建structtype需要的list  根据传入的类型参数构建表  
  132.     /*var list_col = show_col.map{x=>{ 
  133.      /* x._3 match { 
  134.         case "String" => StructField(x._2,StringType,true) 
  135.         case "Int" => StructField(x._2,StringType,true) 
  136.         case "Double" => StructField(x._2,StringType,true) 
  137.         case "Timestamp" => StructField(x._2,StringType,true) 
  138.         case _ => StructField(x._2,StringType,true) 
  139.       }*/  
  140.       StructField(x._2,StringType,true)  
  141.     }  
  142.     }*/  
  143.     /** 
  144.       * structType构造的目的:为在后面产生dataframe的时候指定每个值的列名 
  145.       * 在注册成表的时候可以使用 
  146.       */  
  147.     var list_col: List[StructField] = List()  
  148.     list_col :+= StructField("id", StringType, true)  
  149.       
  150.     for (i <- show_col) {  
  151.       list_col :+= StructField(i._2, StringType, true)  
  152.     }  
  153.   
  154.       
  155.     //构建表的structType  
  156.     val schema = StructType(list_col)  
  157.   
  158.     val tbl_rdd = ColumnValueScanner.iterator().asScala  
  159.     //把过滤器加载到hbaseconf中  
  160.     hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))  
  161.     //构建RDD  
  162.     val hbaseRDD = sc.newAPIHadoopRDD(  
  163.       hbaseConf,  
  164.       classOf[TableInputFormat],  
  165.       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
  166.       classOf[org.apache.hadoop.hbase.client.Result])  
  167.     //构建rdd的结果集  
  168.     val rowRDD = hbaseRDD.map { case (_, result) => {  
  169.       var valueSeq: Seq[String] = Seq()  
  170.       //获取行键  
  171.       val key = Bytes.toString(result.getRow)  
  172.   
  173.       //通过列族和列名获取列  不加rowkey方法  
  174.       //      for(column <- columns) {  
  175.       //        valueSeq :+= Bytes.toString(result.getValue(family.getBytes, column.getBytes))  
  176.       //      }  
  177.       //加rowkey方法,Array第一列必须是"rowkey"  
  178.       valueSeq :+= key  
  179.       for (row <- show_col) {  
  180.         valueSeq :+= Bytes.toString(result.getValue(row._1.getBytes, row._2.getBytes))  
  181.       }  
  182.       Row.fromSeq(valueSeq)  
  183.     }  
  184.     }  
  185.     val hbasedataframe = sqlContext.createDataFrame(rowRDD, schema)  
  186.   
  187.     hbasedataframe.registerTempTable(tbl_nm)  
  188.     sqlContext  
  189.   }  
  190. }

Commentaires

  1. Thanks for sharing your innovative ideas to our vision. I have read your blog and I gathered some new information through your blog. Your blog is really very informative and unique. Keep posting like this. Awaiting for your further update.If you are looking for any apache spark scala related information, please visit our website Apache spark training institute in Bangalore

    RépondreSupprimer

Publier un commentaire

Posts les plus consultés de ce blog

Spark performance optimization: shuffle tuning

Spark optimization

Use Apache Spark to write data to ElasticSearch