Migrating an Apache HBase Table Between Different Clusters

In this post, we are going to look at some best practices for moving Apache HBase tables between different clusters of Pivotal HD, or other Hadoop distributions. Whether you are migrating to a new system, copying data between two business groups, or you are keeping HBase clusters in sync across different data centers, there are many different tools and methodologies for doing this. Hbase comes with a number of helpful utilities to move data in and out of HBase, all of which are documented, here and here. The method I outline in this post is a tried-and-true approach, allowing you to copy an HBase table to a destination cluster while edits are still in-flight.
By copying a bulk of your table, it is quick and simple to play “catch-up” (i.e., copy any new transactions since the initial export) and flip the switch to your new cluster. Other methods, like Hadoop’s DistCp feature, require the the HBase table to be disabled, forcing your applications to wait while the full table is copied and enabled.
Let’s assume we have two clusters, Cluster A and Cluster B, and we want to migrate a large table from A to B. Say our business has multiple groups using the same cluster and we want to break a group off onto their own system. We’ve setup our new cluster, and it is time to migrate the group’s HBase tables over. In order to keep our applications functional during migration to this new system, we want to use a method where we do not need to disable the tables during the copy. Using HBase’s CopyTable utility, we can do an efficient and scalable copy to a destination HBase cluster, while keeping our current table online. After we have copied the table, we can re-configure our applications to use the new cluster, and back copy any new transactions using the same CopyTable utility.
Let’s look at a flow diagram and discuss it:
HBase flow diagram
1. Gather the current table splits on Cluster A from the HBase Web UI
2. Create the table on Cluster B via the HBase shell, using the splits from Step 1
3. Use the CopyTable command line utility to copy the table from Cluster A to Cluster B
After this is done, you’ll have made a copy of the table on both clusters. If at any point you want to no longer use the table on Cluster A, the same process can be followed to move over any updates or deletions that had occurred since the original export by using the CopyTable utility and specifying the --startime parameter. We will be taking note of the time we started the initial copy so we can use this later. Or, you can always use MapReduce with a TableInputFormat on Cluster B to locate the key with the latest timestamp.
Let’s look at an example to demonstrate this process. We will be copying a table called statuses, which contains tweets and tweet metadata from around twelve million tweets. The table on Cluster A contains one column family, c, with ten regions being hosted on five RegionServers. We will be migrating this table to Cluster B, which has ten RegionServers.
Here is some output of the HBase shell’s count command on Cluster A. Since this is executing a full table scan from a single client, this will take a while for very large tables. You may want to skip this step.
[ajshook@phdA-1 ~]$ hbase shell
hbase(main):001:0> count 'statuses', INTERVAL => 5000000, CACHE => 1000
Current count: 5000000, row: 363405378597289984
Current count: 10000000, row: 399583893013417984
12008721 row(s) in 122.2150 seconds
1. Gather the current table splits on Cluster A
Before importing, we will need to create the table on Cluster B. There are many options you can pass in when creating a table. One of the most important options when doing a copy of this magnitude is to pre-split the table. Otherwise, HBase will create one region and all your data will be put in that region. A major compaction will then occur, splitting the single region into many regions. When we pre-split, the regions are already made and there will be less compactions, if any. Pre-splitting should always be done when writing a lot of data, the hard part is determining what the rows should be for the splits.
It is pretty easy to determine our splits of the statuses table, since we are migrating a table over. One way to see the splits is using the HBase web UI, which can be found by pointing a browser to your HBase Master host on port 60010, then clicking on the appropriate link to your table. You can also find all the column family names (identified by the values of all NAME entries), which you will need when creating your table. You can see here a table with ‘Start Key’ and ‘End Key’ columns. Let’s take all the values in the ‘End Key’ column and make them our split keys. Now we have our tables splits for the destination cluster.
2. Create the table on Cluster B via the HBase shell
The HBase command line utility is the simplest way to do create an HBase table. After logging in by executing hbase shell on your client node, you can create a table using the create command:
[ajshook@phdB-1 ~]$ hbase shell
hbase(main):001:0> create 'test', 'cf1', 'cf2'
0 row(s) in 2.2660 seconds
This creates an HBase table called test with two column families, cf1 and cf2.
We will want to specify the table splits we had gathered before. Let’s create our statuses table with a single column family, c, with all of the splits we had gathered.
Note: Executing describe ‘<tablename>’ in your source cluster’s HBase shell will show a dictionary of configuration parameters for that specific table. It may be a good idea to use this dictionary when creating the table on your destination cluster.
hbase(main):002:0> create 'statuses', 'c', {SPLITS => ['351096312705449984','352379274814169088','362887897977860097','363387615740825600','374194860753821696','374916654301327360','385809349891088384','396362382320369664','407120168121151489']}

0 row(s) in 2.4450 seconds
Now we have our statuses table on Cluster B. Time to fill it with data.
3. Use the CopyTable utility to copy the table from Cluster A to Cluster B
On Cluster B, execute the following command to kick off a MapReduce job to copy the table. It has a number of parameters, such as a start timestamp, an end timestamp, a destination table name (if different from the one we are copying), which column families to copy, and the number of versions for each cell. The peer.adr parameter is required, and specifies the destination cluster for the operation. It has the format of <hbase.zookeeper.quorum>:<hbase.zookeeper.clientPort>:<zookeeper.znode.parent>, all of which can be found in the hbase-site.xml file of your destination cluster. The ZooKeeper client port default is 2181, and the ZNode parent default is /hbase. The quorum address hosts depend on your install. The last parameter is the source table to copy, statuses.
After the copy, we will run a ‘count’ via the HBase shell to demonstrate the successful import (something you may not want to do on your large tables).
Note: Simply executing “hbase org.apache.hadoop.hbase.mapreduce.CopyTable” will show the usage with some options about importing an exported table.
# Take note of the current number of milliseconds for our copy
[ajshook@phdA-1 ~]$ date +%s%N | cut -b1-13
1392665035863
[ajshook@phdA-1 ~]$ hbase org.apache.hadoop.hbase.mapreduce.CopyTable --peer.adr=phdB-2,phdB-3,phdB-4:2181:/hbase statuses

... MapReduce log messages...
... Job Success!!! ...

# On our destination cluster phdB-1...
[ajshook@phdB-1 ~]$ hbase shell
hbase(main):001:0> count 'statuses', INTERVAL => 5000000, CACHE => 1000
Current count: 5000000, row: 363405378597289984
Current count: 10000000, row: 399583893013417984
12008721 row(s) in 118.0820 seconds
Congratulations! You have successfully copied an HBase table between two clusters. You can now reconfigure your applications to use the new cluster, and run CopyTable again, specifying the starttime parameter equal to the date we got prior to running the copy. This will copy over any missed transactions since the initial import.

Commentaires

Posts les plus consultés de ce blog

Controlling Parallelism in Spark by controlling the input partitions by controlling the input partitions

Spark performance optimization: shuffle tuning

Spark optimization