spark hbasefilter hbase

  1. package cn.deppon.Tool  
  2.   
  3. import java.util  
  4.   
  5. import scala.collection.JavaConverters._  
  6. import org.apache.hadoop.hbase.HBaseConfiguration  
  7. import org.apache.hadoop.hbase.client.{HTable, ResultScanner, Scan}  
  8. import org.apache.hadoop.hbase.filter._  
  9. import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
  10. import org.apache.hadoop.hbase.protobuf.ProtobufUtil  
  11. import org.apache.hadoop.hbase.util.{Base64, Bytes}  
  12. import org.apache.log4j.{Level, Logger}  
  13. import org.apache.spark.sql.{Row, SQLContext}  
  14. import org.apache.spark.sql.types._  
  15. import org.apache.spark.{SparkConf, SparkContext}  
  16.   
  17. /** 
  18.   * Created by DreamBoy on 2017/5/12. 
  19.   */  
  20. object SparkHbaseTool {  
  21.   
  22.   /** 
  23.     * 利用主构造器构造需要的环境的基本条件 
  24.     */  
  25.   Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)  
  26.   //设置spark参数  
  27.   val conf = new SparkConf().setMaster("local[2]")  
  28.     .setAppName("HbaseTest")  
  29.   conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer")  
  30.   val sc = new SparkContext(conf)  
  31.   val hbaseConf = HBaseConfiguration.create()  
  32.   val sqlContext = new SQLContext(sc)  
  33.   
  34.   //配置HBase  
  35.   hbaseConf.set("hbase.rootdir""hdfs://http://192.168.10.228/hbase")  
  36.   hbaseConf.set("hbase.zookeeper.quorum""192.168.10.228,192.168.10.229,192.168.10.230,192.168.10.231,192.168.10.232")  
  37.   hbaseConf.set("hbase.zookeeper.property.clientPort""2181")  
  38.   hbaseConf.set("hbase.master""192.168.10.230")  
  39.   
  40.   def convertScanToString(scan: Scan) = {  
  41.     val proto = ProtobufUtil.toScan(scan)  
  42.     Base64.encodeBytes(proto.toByteArray)  
  43.   }  
  44.   
  45.   /** 
  46.     * 
  47.     * @param tbl_nm     表名 
  48.     * @param show_col   _1 列族  _2列名  _3 列类型(String,Int,Double,Timestamp...) 
  49.     * @param filter_col _1 列族  _2列名  _3 筛选值  _4 筛选类型(=,<,>,!=...) 
  50.     * @return sqlcontext 
  51.     */  
  52.   def getTableNm(tbl_nm: String, show_col: Array[(String, String, String)], filter_col: Array[(String, String, String, String)]): (SQLContext) = {  
  53.   
  54.     hbaseConf.set(TableInputFormat.INPUT_TABLE, tbl_nm)  
  55.     val table = new HTable(hbaseConf, tbl_nm)  
  56.     val scan = new Scan()  
  57.   
  58.     /** 
  59.       * 指定列族和需要显示的列名 
  60.       * 添加多个需要用到的列 
  61.       */  
  62.     /* 
  63.     val length = show_col.length 
  64.     for(i <- show_col){ 
  65.       scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2)) 
  66.     } 
  67.     */  
  68.     //设置rowkey的范围,启示和结束  
  69.     //scan.setStartRow(Bytes.toBytes(""))  
  70.     //scan.setStopRow(Bytes.toBytes(""))  
  71.     val fil_len = filter_col.length  
  72.     println("------>>>>" + fil_len)  
  73.     //如果没有添加过滤器,就给过滤器添加空  
  74.     if (fil_len > 0) {  
  75.       val filter_arr = new util.ArrayList[Filter](fil_len)  
  76.   
  77.       for (i <- filter_col) {  
  78.         i._4 match {  
  79.           case "=" => {  
  80.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  81.               Bytes.toBytes(i._2), CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  82.             filter1.setFilterIfMissing(true)  
  83.             filter_arr.add(filter1)  
  84.           }  
  85.           case "<" => {  
  86.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  87.               Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes(i._3)))  
  88.             filter1.setFilterIfMissing(true)  
  89.             filter_arr.add(filter1)  
  90.           }  
  91.           case "<=" => {  
  92.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  93.               Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  94.             filter1.setFilterIfMissing(true)  
  95.             filter_arr.add(filter1)  
  96.           }  
  97.           case ">" => {  
  98.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  99.               Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(i._3)))  
  100.             filter1.setFilterIfMissing(true)  
  101.             filter_arr.add(filter1)  
  102.           }  
  103.           case ">=" => {  
  104.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  105.               Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  106.             //filter1.setFilterIfMissing(true)  
  107.             filter_arr.add(filter1)  
  108.           }  
  109.           case "!=" => {  
  110.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  111.               Bytes.toBytes(i._2), CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  112.             filter1.setFilterIfMissing(true)  
  113.             filter_arr.add(filter1)  
  114.           }  
  115.           case _ => {}  
  116.         }  
  117.       }  
  118.       /** 
  119.         * 通过使用filterlist可以加载多个过滤器 
  120.         * 设置多个过滤器 
  121.         */  
  122.       val filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filter_arr)  
  123.       scan.setFilter(filterList)  
  124.     } else {  
  125.       scan.setFilter(null)  
  126.     }  
  127.   
  128.     //hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))  
  129.     //获取表的扫描  
  130.     val ColumnValueScanner = table.getScanner(scan)  
  131.     //构建structtype需要的list  根据传入的类型参数构建表  
  132.     /*var list_col = show_col.map{x=>{ 
  133.      /* x._3 match { 
  134.         case "String" => StructField(x._2,StringType,true) 
  135.         case "Int" => StructField(x._2,StringType,true) 
  136.         case "Double" => StructField(x._2,StringType,true) 
  137.         case "Timestamp" => StructField(x._2,StringType,true) 
  138.         case _ => StructField(x._2,StringType,true) 
  139.       }*/  
  140.       StructField(x._2,StringType,true)  
  141.     }  
  142.     }*/  
  143.     /** 
  144.       * structType构造的目的:为在后面产生dataframe的时候指定每个值的列名 
  145.       * 在注册成表的时候可以使用 
  146.       */  
  147.     var list_col: List[StructField] = List()  
  148.     list_col :+= StructField("id", StringType, true)  
  149.       
  150.     for (i <- show_col) {  
  151.       list_col :+= StructField(i._2, StringType, true)  
  152.     }  
  153.   
  154.       
  155.     //构建表的structType  
  156.     val schema = StructType(list_col)  
  157.   
  158.     val tbl_rdd = ColumnValueScanner.iterator().asScala  
  159.     //把过滤器加载到hbaseconf中  
  160.     hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))  
  161.     //构建RDD  
  162.     val hbaseRDD = sc.newAPIHadoopRDD(  
  163.       hbaseConf,  
  164.       classOf[TableInputFormat],  
  165.       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
  166.       classOf[org.apache.hadoop.hbase.client.Result])  
  167.     //构建rdd的结果集  
  168.     val rowRDD = hbaseRDD.map { case (_, result) => {  
  169.       var valueSeq: Seq[String] = Seq()  
  170.       //获取行键  
  171.       val key = Bytes.toString(result.getRow)  
  172.   
  173.       //通过列族和列名获取列  不加rowkey方法  
  174.       //      for(column <- columns) {  
  175.       //        valueSeq :+= Bytes.toString(result.getValue(family.getBytes, column.getBytes))  
  176.       //      }  
  177.       //加rowkey方法,Array第一列必须是"rowkey"  
  178.       valueSeq :+= key  
  179.       for (row <- show_col) {  
  180.         valueSeq :+= Bytes.toString(result.getValue(row._1.getBytes, row._2.getBytes))  
  181.       }  
  182.       Row.fromSeq(valueSeq)  
  183.     }  
  184.     }  
  185.     val hbasedataframe = sqlContext.createDataFrame(rowRDD, schema)  
  186.   
  187.     hbasedataframe.registerTempTable(tbl_nm)  
  188.     sqlContext  
  189.   }  
  190. }

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization