spark hbasefilter hbase
- package cn.deppon.Tool
- import java.util
- import scala.collection.JavaConverters._
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.client.{HTable, ResultScanner, Scan}
- import org.apache.hadoop.hbase.filter._
- import org.apache.hadoop.hbase.mapreduce.TableInputFormat
- import org.apache.hadoop.hbase.protobuf.ProtobufUtil
- import org.apache.hadoop.hbase.util.{Base64, Bytes}
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.sql.{Row, SQLContext}
- import org.apache.spark.sql.types._
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * Created by DreamBoy on 2017/5/12.
- */
- object SparkHbaseTool {
- /**
- * 利用主构造器构造需要的环境的基本条件
- */
- Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
- //设置spark参数
- val conf = new SparkConf().setMaster("local[2]")
- .setAppName("HbaseTest")
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- val sc = new SparkContext(conf)
- val hbaseConf = HBaseConfiguration.create()
- val sqlContext = new SQLContext(sc)
- //配置HBase
- hbaseConf.set("hbase.rootdir", "hdfs://http://192.168.10.228/hbase")
- hbaseConf.set("hbase.zookeeper.quorum", "192.168.10.228,192.168.10.229,192.168.10.230,192.168.10.231,192.168.10.232")
- hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
- hbaseConf.set("hbase.master", "192.168.10.230")
- def convertScanToString(scan: Scan) = {
- val proto = ProtobufUtil.toScan(scan)
- Base64.encodeBytes(proto.toByteArray)
- }
- /**
- *
- * @param tbl_nm 表名
- * @param show_col _1 列族 _2列名 _3 列类型(String,Int,Double,Timestamp...)
- * @param filter_col _1 列族 _2列名 _3 筛选值 _4 筛选类型(=,<,>,!=...)
- * @return sqlcontext
- */
- def getTableNm(tbl_nm: String, show_col: Array[(String, String, String)], filter_col: Array[(String, String, String, String)]): (SQLContext) = {
- hbaseConf.set(TableInputFormat.INPUT_TABLE, tbl_nm)
- val table = new HTable(hbaseConf, tbl_nm)
- val scan = new Scan()
- /**
- * 指定列族和需要显示的列名
- * 添加多个需要用到的列
- */
- /*
- val length = show_col.length
- for(i <- show_col){
- scan.addColumn(Bytes.toBytes(i._1),Bytes.toBytes(i._2))
- }
- */
- //设置rowkey的范围,启示和结束
- //scan.setStartRow(Bytes.toBytes(""))
- //scan.setStopRow(Bytes.toBytes(""))
- val fil_len = filter_col.length
- println("------>>>>" + fil_len)
- //如果没有添加过滤器,就给过滤器添加空
- if (fil_len > 0) {
- val filter_arr = new util.ArrayList[Filter](fil_len)
- for (i <- filter_col) {
- i._4 match {
- case "=" => {
- val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
- Bytes.toBytes(i._2), CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
- filter1.setFilterIfMissing(true)
- filter_arr.add(filter1)
- }
- case "<" => {
- val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
- Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes(i._3)))
- filter1.setFilterIfMissing(true)
- filter_arr.add(filter1)
- }
- case "<=" => {
- val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
- Bytes.toBytes(i._2), CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
- filter1.setFilterIfMissing(true)
- filter_arr.add(filter1)
- }
- case ">" => {
- val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
- Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes(i._3)))
- filter1.setFilterIfMissing(true)
- filter_arr.add(filter1)
- }
- case ">=" => {
- val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
- Bytes.toBytes(i._2), CompareFilter.CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
- //filter1.setFilterIfMissing(true)
- filter_arr.add(filter1)
- }
- case "!=" => {
- val filter1 = new SingleColumnValueFilter(Bytes.toBytes(i._1),
- Bytes.toBytes(i._2), CompareFilter.CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes(i._3)))
- filter1.setFilterIfMissing(true)
- filter_arr.add(filter1)
- }
- case _ => {}
- }
- }
- /**
- * 通过使用filterlist可以加载多个过滤器
- * 设置多个过滤器
- */
- val filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, filter_arr)
- scan.setFilter(filterList)
- } else {
- scan.setFilter(null)
- }
- //hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))
- //获取表的扫描
- val ColumnValueScanner = table.getScanner(scan)
- //构建structtype需要的list 根据传入的类型参数构建表
- /*var list_col = show_col.map{x=>{
- /* x._3 match {
- case "String" => StructField(x._2,StringType,true)
- case "Int" => StructField(x._2,StringType,true)
- case "Double" => StructField(x._2,StringType,true)
- case "Timestamp" => StructField(x._2,StringType,true)
- case _ => StructField(x._2,StringType,true)
- }*/
- StructField(x._2,StringType,true)
- }
- }*/
- /**
- * structType构造的目的:为在后面产生dataframe的时候指定每个值的列名
- * 在注册成表的时候可以使用
- */
- var list_col: List[StructField] = List()
- list_col :+= StructField("id", StringType, true)
- for (i <- show_col) {
- list_col :+= StructField(i._2, StringType, true)
- }
- //构建表的structType
- val schema = StructType(list_col)
- val tbl_rdd = ColumnValueScanner.iterator().asScala
- //把过滤器加载到hbaseconf中
- hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))
- //构建RDD
- val hbaseRDD = sc.newAPIHadoopRDD(
- hbaseConf,
- classOf[TableInputFormat],
- classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
- classOf[org.apache.hadoop.hbase.client.Result])
- //构建rdd的结果集
- val rowRDD = hbaseRDD.map { case (_, result) => {
- var valueSeq: Seq[String] = Seq()
- //获取行键
- val key = Bytes.toString(result.getRow)
- //通过列族和列名获取列 不加rowkey方法
- // for(column <- columns) {
- // valueSeq :+= Bytes.toString(result.getValue(family.getBytes, column.getBytes))
- // }
- //加rowkey方法,Array第一列必须是"rowkey"
- valueSeq :+= key
- for (row <- show_col) {
- valueSeq :+= Bytes.toString(result.getValue(row._1.getBytes, row._2.getBytes))
- }
- Row.fromSeq(valueSeq)
- }
- }
- val hbasedataframe = sqlContext.createDataFrame(rowRDD, schema)
- hbasedataframe.registerTempTable(tbl_nm)
- sqlContext
- }
- }
Commentaires
Enregistrer un commentaire