Efficient bulk load of HBase using Spark

A simple process to demonstrate efficient bulk loading into HBase using Spark. The method used does not rely on additional dependencies, and results in a well partitioned HBase table with very high, or complete, data locality. This method should work with any version of Spark or HBase.

Introduction

Loading data into HBase using Spark can be done in a variety of ways, including:
It is inefficient to bulk load by writing through the region servers since it requires HBase to use resources such as the write-ahead log (WAL), the compaction and flush queues, server memory and will trigger a lot of garbage compaction, all of which can be bypassed as explained nicely in this Cloudera Blog. In this post I show a method that avoids this by pre-splitting data and using the standard HBase bulk loading tools. It is rather easy to implement but there are a few gotchas that can trip you up along the way. In demonstrating this process, I hope others may benefit and I'll explain some checks to perform along the way to help in understanding.
The general process involves:
  1. Estimate the total size of the data, and determine the optimal number of regions in HBase.
  2. Create an empty table, pre-split on the region boundaries - here we salt our keys in a predictable manner to achieve this.
  3. Use a simple custom partitioner in Spark to split the RDD to match the destination region splits.
  4. Generate the HFiles using Spark and standard Hadoop libraries.
  5. Load the data into HBase using the standard HBase command line bulk load tools.

Step 1: Prepare HBase Table (estimate data size and pre-split)

An HBase cluster is made up of region servers each serving partitions of one or more tables. These partitions are known as regions and represent a subset of the total rows in a table. The regions themselves are stored on disk as HFiles. If one writes through the region server API, as data grows the HFile would eventually breach a size threshold and the region would split into two (known as daughter regions) and the result would be two new HFiles each in a new directory on HDFS. The HBase bulk load tools allow you to skip all of that by generating HFiles elsewhere, move them into place on HDFS and then instruct HBase to serve them.
However, unless you size the regions correctly up front, as soon as new records come in, or when a major compaction runs, the threshold is checked and the HFile may be split immediately. If you have created a table without pre-splitting, you can easily create a table holding only 1 region with e.g. 1TB of data. As data is written, or major compaction runs, the region split is performed on a single machine and may take many hours. Note that this process will repeat on the daughter regions until they are below the threshold - it may take days.
To avoid this, we estimate the final size of the data that will be in HBase and create a pre-split table. You can read about sizing regions but a good starting point is 5-6GB. The default region size is 10GB and so aiming for approximately have this gives the table time to grow until splitting naturally in due course. To estimate the size of data, one typical approach is to take a sample of the input data, run the process once and then extrapolate - or you can simply take a guess and then refine later. To check the size of a table called ´map_data´ in HBase:
hdfs dfs -du -s -h /hbase/data/default/map_data
242.3 G  726.9 G  /hbase/data/default/map_data
Here the total size is 242.3GB and HDFS has a replication factor of three. If we aim for 6GB per region, then we need 41 regions if they are of equal size. Data is often skewed, so I'd recommend increasing this, say, to 45.
An easy way to achieve balanced region sizes is to use key salting with a modulus based approach. You do this by taking the absolute value of the hashcode of the key, the salt then becomes the modulus(numRegions) which is prefixed to the original key - e.g.:
scala> val key="species:797:1:1:1"
key: String = species:797:1:1:1

scala> val salt = Math.abs(key.hashCode % 45)
salt: Int = 4

scala> val saltedKey = "%02d".format(salt) + ":" + key     // leftpad with 0s
saltedKey: String = 04:species:797:1:1:1
With this in place we can create the table:
create 'map_data',
  { NAME => 'EPSG_3857', VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'FAST_DIFF'},
  {SPLITS => [
    '01','02','03', ...etc... '44'
  ]}
In this example we use
  1. Snappy compression (see compression in HBase) which is a good default choice. Note: region sizing is based on the compressed data size.
  2. FAST_DIFF data block encoding, since we have small payloads and expect our keys to be similar and therefore this is more data efficient (see Data block encoding types).
  3. A split strategy resulting in 45 regions.
  4. String based keys, left padding with 0's. In reality you would likely encode the salts more efficiently than this (e.g. a single byte) but using strings simplified readability here.

Step 2: Write HFiles in Spark (partition data to match the regions created)

With a correctly split target table in place, we now use Spark to create the HFiles. Spark uses the notion of Resilient Distributed Datasets (RDD), whereby a dataset is partitioned across worker nodes in the cluster to be operated on in parallel. What we aim to do is partition the RDD so that each partition will match a region in HBase, and then have the worker create an HFile for that partition. Since we have pre-split our table with a modulus-based salted key approach, we can partition the RDD using the same approach. The following (Scala) illustrates how to write a simple custom partitioner to achieve this.
// salt a key using a modulus based approach
def salt(key: String, modulus: Int) : String = {
  val saltAsInt = Math.abs(key.hashCode) % modulus

  // left pad with 0's (for readability of keys)
  val charsInSalt = digitsRequired(modulus)
  ("%0" + charsInSalt + "d").format(saltAsInt) + ":" + key
}

// number of characters required to encode the modulus in chars (01,02.. etc)
def digitsRequired(modulus: Int) : Int = {
  (Math.log10(modulus-1)+1).asInstanceOf[Int]
}

// A partitioner that puts data destined for the same HBase region together
class SaltPrefixPartitioner[K,V](modulus: Int) extends Partitioner {
  val charsInSalt = digitsRequired(modulus)

  def getPartition(key: Any): Int = {
    key.substring(0,charsInSalt).toInt
  }

  override def numPartitions: Int = modulus
}
Please note, that while we take a simple salting based approach here, a more generic approach would be to read the region boundaries from HBase and use those in the Spark partitioner. Such an approach would work with any table split strategy. This is the approach taken in the HBase v2.0 spark module.
Next we partition the RDD of key value pairs:
// salt the keys
val saltedRDD = sourceRDD.map(r => {
  (salt(r._1, 45), r._2)
})

// repartition and sort the data - HFiles want sorted data
val partitionedRDD = saltedRDD.repartitionAndSortWithinPartitions(new SaltPrefixPartitioner(modulus))

// cells of data for HBase
val cells = partitionedRDD.map(r => {
    val saltedRowKey = Bytes.toBytes(r._1)
    val cellData = r._2

    // create a cell of data for HBase
    val cell = new KeyValue(
      saltedRowKey,
      Bytes.toBytes("epsg_3857"),  // column familily
      "tile", // column qualifier (i.e. cell name)
      cellData)

    (new ImmutableBytesWritable(saltedRowKey), cell)
}
Then write the HFiles:
// setup the HBase configuration
val baseConf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "<your ZK cluster here>");

// NOTE: job creates a copy of the conf
val job = new Job(baseConf, "map data")
val table = new HTable(conf, "map_data")
// Major gotcha(!) - see comments that follow
PatchedHFileOutputFormat2.configureIncrementalLoad(job, table);

val conf = job.getConfiguration // important(!)

// write HFiles onto HDFS
cells.saveAsNewAPIHadoopFile(
  "/tmp/map_data/hfiles",
  classOf[ImmutableBytesWritable],
  classOf[KeyValue],
  classOf[PatchedHFileOutputFormat2],
  "map_data",
  conf)
})
The code above is simply setting up the Hadoop configuration, the serialization formats, the target directory and instructing Spark to save the HFiles. However, there are two things that can trip you up.
  1. Using the HFileOutputFormat, or the PatchedHFileOutputFormat2 seen here, to configure the config is advisable since it reads the HBase table metadata and will configure the compression and block encoding for us automatically. However, HFileOutputFormat takes a job in its interface, not a configuration object and it copies the configuration internally. You must therefore get the configuration back from the job afterwards, or else you will not have compression etc. enabled. It is easy to overlook this.
  2. We want data locality in HBase when moving the HFiles into place. That is to say, we want HDFS to locate the HFiles on the physical machines that are assigned by the HBase master to serve the files. HDFS has a concept of a Favoured Node hint which allows us to provide this. However, because of HBASE-12596 the hint is only used in HBase code versions for 2.0.0, 0.98.14 and 1.3.0. If you are running your Spark code using HBase dependencies for 1.0, 1.1 or 1.2 you will not receive this and you will achieve only random data (i.e. low) locality! This is very bad for performance. I have provided a backported version which will provide this hint and is used in the code above. It is reassuring to notice that the HBase v2.0 spark-module code does the same approach.
Once the HFiles are written, I'd recommend a couple of basic checks before loading them in:
  1. Inspect the HDFS directory hdfs dfs -du -h /tmp/map_data to confirm you have the correct number of files, and the expected size of HFiles.
  2. Inspect a few HFiles using the HBase tool. We're looking to see the compression we set and for each HFile to have keys with one of the prefix salts (02 in this example) e.g.:
# print sample of keys and the metadata
hbase hfile -e -m -f /tmp/map_data/EPSG_3857/...
...
K: 02:species:797:1:1:0:/EPSG_3857:tile/1476880136948/Put/vlen=48/seqid=0
K: 02:species:797:1:1:1:/EPSG_3857:tile/1476880136948/Put/vlen=48/seqid=0
...
compression=snappy,
...

Step 3: Load into HBase

We now have nicely partitioned HFiles, ready to load into our table:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /tmp/map_data map_data
Here we are using the HBase command line tool allowing us to review the HFiles before loading, but this call could be added into the Spark code to automate it.
During loading, should you see any log lines saying HFile no longer fits into a single partition splitting... it indicates that we got the sizing of the regions wrong, and will result in poor loading performance. It may also appear if the table was being written to and if a region split occurred while the data was being staged for loading. If that were to happen, the load class rearranges the staged file into the matching two daughter ranges, creating two new HFiles in the process. This rearranging happens on the machine executing the load and slows the process.
Using the HBase master user interface, the locality of the regions should all indicate 1.0 or very close to that.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization