Spark access hbase table data using hbasefilter

Copyright statement: This article is the bloggers original article, without the blog owner may not be reproduced.

Try to achieve their own class,

The role of this class: call the internal method

Select the columns to be displayed based on the entered table name, column family, column name

According to the input of the column family, column name, column value, filter conditions, is to filter the data with a filter

Return Value: SQLContext

Has completed the registration table, you can directly operate sql method, the use of sql language query processing

The code is as follows:


  1. package cn.deppon.Tool  
  2.   
  3. import java.util  
  4.   
  5. import scala.collection.JavaConverters._  
  6. import org.apache.hadoop.hbase.HBaseConfiguration  
  7. import org.apache.hadoop.hbase.client.{HTable, ResultScanner, Scan}  
  8. import org.apache.hadoop.hbase.filter._  
  9. import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
  10. import org.apache.hadoop.hbase.protobuf.ProtobufUtil  
  11. import org.apache.hadoop.hbase.util.{Base64, Bytes}  
  12. import org.apache.log4j.{Level, Logger}  
  13. import org.apache.spark.sql.{Row, SQLContext}  
  14. import org.apache.spark.sql.types._  
  15. import org.apache.spark.{SparkConf, SparkContext}  
  16.   
  17. /** 
  18.   * Created by DreamBoy on 2017/5/12. 
  19.   */  
  20. object SparkHbaseTool {  
  21.   
  22.   /** 
  23.     * 利用主构造器构造需要的环境的基本条件 
  24.     */  
  25.   Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)  
  26.   //设置spark参数  
  27.   val conf = new SparkConf().setMaster("local[2]")  
  28.     .setAppName("HbaseTest")  
  29.   conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer")  
  30.   val sc = new SparkContext(conf)  
  31.   val hbaseConf = HBaseConfiguration.create()  
  32.   val sqlContext = new SQLContext(sc)  
  33.   
  34.   //配置HBase  
  35.   hbaseConf.set("hbase.rootdir""hdfs://http://192.168.10.228/hbase")  
  36.   hbaseConf.set("hbase.zookeeper.quorum""192.168.10.228,192.168.10.229,192.168.10.230,192.168.10.231,192.168.10.232")  
  37.   hbaseConf.set("hbase.zookeeper.property.clientPort""2181")  
  38.   hbaseConf.set("hbase.master""192.168.10.230")  
  39.   
  40.   def convertScanToString(scan: Scan) = {  
  41.     val proto = ProtobufUtil.toScan(scan)  
  42.     Base64.encodeBytes(proto.toByteArray)  
  43.   }  
  44.   
  45.   /** 
  46.     * 
  47.     * @param tbl_nm     表名 
  48.     * @param show_col   _1 列族  _2列名  _3 列类型(String,Int,Double,Timestamp...) 
  49.     * @param filter_col _1 列族  _2列名  _3 筛选值  _4 筛选类型(=,<,>,!=...) 
  50.     * @return sqlcontext 
  51.     */  
  52.   def getTableNm(tbl_nm: String, show_col: Array[(String, String, String)], filter_col: Array[(String, String, String, String)]): (SQLContext) = {  
  53.   
  54.     hbaseConf.set(TableInputFormat.INPUT_TABLE, tbl_nm)  
  55.     val table = new HTable(hbaseConf, tbl_nm)  
  56.     val scan = new Scan()  
  57.   
  58.     /** 
  59.       * 指定列族和需要显示的列名 
  60.       * 添加多个需要用到的列 
  61.       */  
  62.     /* 
  63.     val length = show_col.length 
  64.     for(i <- show_col){ 
  65.       scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2)) 
  66.     } 
  67.     */  
  68.     //设置rowkey的范围,启示和结束  
  69.     //scan.setStartRow(Bytes.toBytes(""))  
  70.     //scan.setStopRow(Bytes.toBytes(""))  
  71.     val fil_len = filter_col.length  
  72.     println("------>>>>" + fil_len)  
  73.     //如果没有添加过滤器,就给过滤器添加空  
  74.     if (fil_len > 0) {  
  75.       val filter_arr = new util.ArrayList[Filter](fil_len)  
  76.   
  77.       for (i <- filter_col) {  
  78.         i._4 match {  
  79.           case "=" => {  
  80.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  81.               Bytes.toBytes(i._2), CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  82.             filter1.setFilterIfMissing(true)  
  83.             filter_arr.add(filter1)  
  84.           }  
  85.           case "<" => {  
  86.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  87.               Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes(i._3)))  
  88.             filter1.setFilterIfMissing(true)  
  89.             filter_arr.add(filter1)  
  90.           }  
  91.           case "<=" => {  
  92.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  93.               Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  94.             filter1.setFilterIfMissing(true)  
  95.             filter_arr.add(filter1)  
  96.           }  
  97.           case ">" => {  
  98.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  99.               Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(i._3)))  
  100.             filter1.setFilterIfMissing(true)  
  101.             filter_arr.add(filter1)  
  102.           }  
  103.           case ">=" => {  
  104.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  105.               Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  106.             //filter1.setFilterIfMissing(true)  
  107.             filter_arr.add(filter1)  
  108.           }  
  109.           case "!=" => {  
  110.             val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),  
  111.               Bytes.toBytes(i._2), CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))  
  112.             filter1.setFilterIfMissing(true)  
  113.             filter_arr.add(filter1)  
  114.           }  
  115.           case _ => {}  
  116.         }  
  117.       }  
  118.       /** 
  119.         * 通过使用filterlist可以加载多个过滤器 
  120.         * 设置多个过滤器 
  121.         */  
  122.       val filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filter_arr)  
  123.       scan.setFilter(filterList)  
  124.     } else {  
  125.       scan.setFilter(null)  
  126.     }  
  127.   
  128.     //hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))  
  129.     //获取表的扫描  
  130.     val ColumnValueScanner = table.getScanner(scan)  
  131.     //构建structtype需要的list  根据传入的类型参数构建表  
  132.     /*var list_col = show_col.map{x=>{ 
  133.      /* x._3 match { 
  134.         case "String" => StructField(x._2,StringType,true) 
  135.         case "Int" => StructField(x._2,StringType,true) 
  136.         case "Double" => StructField(x._2,StringType,true) 
  137.         case "Timestamp" => StructField(x._2,StringType,true) 
  138.         case _ => StructField(x._2,StringType,true) 
  139.       }*/  
  140.       StructField(x._2,StringType,true)  
  141.     }  
  142.     }*/  
  143.     /** 
  144.       * structType构造的目的:为在后面产生dataframe的时候指定每个值的列名 
  145.       * 在注册成表的时候可以使用 
  146.       */  
  147.     var list_col: List[StructField] = List()  
  148.     list_col :+= StructField("id", StringType, true)  
  149.       
  150.     for (i <- show_col) {  
  151.       list_col :+= StructField(i._2, StringType, true)  
  152.     }  
  153.   
  154.       
  155.     //构建表的structType  
  156.     val schema = StructType(list_col)  
  157.   
  158.     val tbl_rdd = ColumnValueScanner.iterator().asScala  
  159.     //把过滤器加载到hbaseconf中  
  160.     hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))  
  161.     //构建RDD  
  162.     val hbaseRDD = sc.newAPIHadoopRDD(  
  163.       hbaseConf,  
  164.       classOf[TableInputFormat],  
  165.       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
  166.       classOf[org.apache.hadoop.hbase.client.Result])  
  167.     //构建rdd的结果集  
  168.     val rowRDD = hbaseRDD.map { case (_, result) => {  
  169.       var valueSeq: Seq[String] = Seq()  
  170.       //获取行键  
  171.       val key = Bytes.toString(result.getRow)  
  172.   
  173.       //通过列族和列名获取列  不加rowkey方法  
  174.       //      for(column <- columns) {  
  175.       //        valueSeq :+= Bytes.toString(result.getValue(family.getBytes, column.getBytes))  
  176.       //      }  
  177.       //加rowkey方法,Array第一列必须是"rowkey"  
  178.       valueSeq :+= key  
  179.       for (row <- show_col) {  
  180.         valueSeq :+= Bytes.toString(result.getValue(row._1.getBytes, row._2.getBytes))  
  181.       }  
  182.       Row.fromSeq(valueSeq)  
  183.     }  
  184.     }  
  185.     val hbasedataframe = sqlContext.createDataFrame(rowRDD, schema)  
  186.   
  187.     hbasedataframe.registerTempTable(tbl_nm)  
  188.     sqlContext  
  189.   }  

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization