Integration of Spark and HBase

Preface <br> because just before the HBase as a scale and lasting ability of KV database, so only the indicators used in the storage, see before long article based on real-time calculation index Storm do HBase storage . HBase will be used in the user behavior of the storage, because Rowkey's filtering function is also very good, you can easily put people or content dimensions to filter out all the behavior. In a sense, HBase is a storage engine with and only one multi-field composite index.

Although I am more respected real-time computing, but fill the data or calculate the historical data Han, batch processing or indispensable. For the calculation of historical data, in fact, I have two options, one is based on HBase has been stored in the behavior of the data to calculate, or based on Hive original data to calculate the final selection of the former, which involves Spark (StreamingPro ) Batch processing for HBase.
Integration process

And Spark integration, which means that the best can be Schema (Mapping), because Dataframe and SQL API requires you to have Schema. Unfortunately, HBase has no Schema depending on the user and the scene. SparkOnHBase usually requires you to define a mapping (Schema), such as hortonworks SHC ( https://github.com/hortonworks-spark/shc ) requires you to define a configuration as follows:


{
"rowkey":"key",
"table":{"namespace":"default", "name":"pi_user_log", "tableCoder":"PrimitiveType"},
"columns":{"col0":{"cf":"rowkey", "col":"key", "type":"string"},
"col1":{"cf":"f","col":"col1", "type":"string"}
}
}

Look at the above definition is still very easy to see out. A list of HBase and a list of columns, so that you can use the Spark DataSource API on how to develop Spark DataSource API can refer to my article using the Spark DataSource API to achieve the use of Rest data sources , SHC generally Is the API. Now you can use it:
 
 val cat = "{\n\"rowkey\":\"key\",\"table\":{\"namespace\":\"default\", \"name\":\"pi_user_log\", \"tableCoder\":\"PrimitiveType\"},\n\"columns\":{\"col0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"},\n\"28360592\":{\"cf\":\"f\",\"col\":\"28360592\", \"type\":\"string\"}\n}\n}"
    val cc = sqlContext
      .read
      .options(Map(HBaseTableCatalog.tableCatalog -> cat))
      .format("org.apache.spark.sql.execution.datasources.hbase")
      .load() 
 
But when you have thousands of columns, then this is no solution, you are unlikely to define one by one, and many times the user does not know which columns, column name may even be a timestamp. We have now encountered several cases, so we need to solve:
  1. Automatically get all the columns in the HBase form a Schema, so that users do not need to configure.
  2. Provisions HBase only two columns, a rowkey, a content, content is a map, including all the column family + column for the key, the corresponding content for the value.
First talk about the second program (because in fact the first program also depends on the second program):


{
        "name": "batch.sources",
        "params": [
          {
            "inputTableName": "log1",
            "format": "org.apache.spark.sql.execution.datasources.hbase.raw",
            "path": "-",
            "outputTable": "log1"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select rowkey,json_value_collect(content) as actionList from log1",
            "outputTableName":"finalTable"
          }
        ]
      },

First, we configured a HBase table, called log1, of course, here because the program through the hbase-site.xml get HBase link, so you can not see the configuration of HBase-related information. Then, in SQL you can handle the content. I am here is the content into a JSON format string. And then you can write their own UDF function and the like to do so, in order to achieve your complex business logic. In fact, each field is stored in the JSON, so I do not care about the column name, as long as I get all the columns like. And the above example just to meet my needs.

And to achieve this HBase DataSource is also very simple, the core logic as follows:
 
case class HBaseRelation(
                          parameters: Map[String, String],
                          userSpecifiedschema: Option[StructType]
                        )(@transient val sqlContext: SQLContext)
  extends BaseRelation with TableScan with Logging {

  val hbaseConf = HBaseConfiguration.create()


  def buildScan(): RDD[Row] = {
    hbaseConf.set(TableInputFormat.INPUT_TABLE, parameters("inputTableName"))
    val hBaseRDD = sqlContext.sparkContext.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
      .map { line =>
        val rowKey = Bytes.toString(line._2.getRow)

        import net.liftweb.{json => SJSon}
        implicit val formats = SJSon.Serialization.formats(SJSon.NoTypeHints)

        val content = line._2.getMap.navigableKeySet().flatMap { f =>
          line._2.getFamilyMap(f).map { c =>
            (Bytes.toString(f) + ":" + Bytes.toString(c._1), Bytes.toString(c._2))
          }
        }.toMap

        val contentStr = SJSon.Serialization.write(content)

        Row.fromSeq(Seq(UTF8String.fromString(rowKey), UTF8String.fromString(contentStr)))
      }
    hBaseRDD
  }
}
 
So how do we get Spark to automatically find the Schema? In general you still need to filter all the data to get the collection of columns, and then form the Schema, the cost is very costly. We can also be our first data into JSON format, and then you can use Spark has been supported JSON format to automatically down the ability of the Schema.

In general, in fact, we do not encourage you to use Spark HBase batch, because it is easy to HBase overload, such as memory overflow caused RegionServer hang up, the most regret is that once the RegionServer hang up, there will be a period of time to read Write is not available, and HBase is very easy as a real-time online program storage, so a great impact.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization