Sunday, January 29, 2012

Filters in HBase (or intra row scanning part II)

Filters in HBase are a somewhat obscure and under-documented feature. (Even us committers are often not aware of their usefulness - see HBASE-5229, and HBASE-4256... Or maybe it's just me...).

Intras row scanning can be done using ColumnRangeFilter. Other filters such as ColumnPrefixFilter or MultipleColumnPrefixFilter might also be handy for this. All three filters have in common that they can provide scanners (see scanning in hbase) with what I will call "seek hints". These hints allow a scanner to seek to the next column, the next row, or an arbitrary next cell determined by the filter. This is far more efficient than having a dumb filter that is passed each cell and determines whether the cell is included in the result or not.

Many other filters also provide these "seek hints". The exception here are filters that filter on column values, as there is no inherent ordering between column values; these filters need to look at the value for each column.

For example check out this code in MultipleColumnPrefixFilter (ASF 2.0 license):
    TreeSet<byte []> lesserOrEqualPrefixes =
      (TreeSet<byte []>) sortedPrefixes.headSet(qualifier, true);
    if (lesserOrEqualPrefixes.size() != 0) {
      byte [] largestPrefixSmallerThanQualifier = lesserOrEqualPrefixes.last();
      if (Bytes.startsWith(qualifier, largestPrefixSmallerThanQualifier)) {
        return ReturnCode.INCLUDE;
      if (lesserOrEqualPrefixes.size() == sortedPrefixes.size()) {
        return ReturnCode.NEXT_ROW;
      } else {
        hint = sortedPrefixes.higher(largestPrefixSmallerThanQualifier);
        return ReturnCode.SEEK_NEXT_USING_HINT;
    } else {
      hint = sortedPrefixes.first();
      return ReturnCode.SEEK_NEXT_USING_HINT;

(the <hint> is used later to skip ahead to that column prefix)

See how this code snippet allows the filter to
  1. seek to the next row if all prefixes are know to be less or equal the current qualifier (and the largest didn't match the passed column qualifier). Note that a single seek to the next row can potentially skip millions of columns with a single seek operation.
  2. seek to the next larger prefix if there are more prefixes, but the current does not match the qualifier.
  3. seek to the first prefix (the smallest) if none the prefixes are less or equal to the current qualifier.
If you didn't feel like looking at the code, you can take away from this that these filters can be safely and efficiently used in very wide rows. If the filter instead would indicate only INCLUDE or SKIP and be forced to visit/examine every version of every column of every row, it would be inefficient to use for wide rows with hundreds of thousands or millions of columns.

I'm in the process of adding more information for these Filter to the HBase Book Reference Guide.

Friday, January 27, 2012

Scanning in HBase

In HBASE-5268 I proposed a "prefix delete marker" (a delete marker that would mark a set of columns identified by a column prefix as deleted).

As it turns out, I didn't quite think through how scanning/seeking in HBase works, especially when delete markers are involved. So I thought I'd write about that here.
At the end of the article I hope you will understand why column prefix delete marker that are sorted with the KeyValues that they affect cannot work in HBase.

This blog entry describes how deletion works in HBase. One interesting piece of information not mentioned there is that version and column delete markers are ordered in line together with the KeyValues that they affect and family delete markers are always sorted to the beginning of their row.

Generally each column family is represented by a Store, which manages one or more StoreFiles. Scanning is a form of merge-sort performed by a RegionScanner, which merges results of one or more StoreScanners (one per family), who in turn merge results of one or more StoreFileScanners (one for each file for this family):

                                                    /                     \
                               StoreScanner                      StoreScanner
                               /                   \                      /                  \
        StoreFileScanner   StoreFileScanner   StoreFileScanner  StoreFileScanner
                   |                             |                            |                          |
             StoreFile                  StoreFile                StoreFile              StoreFile

Say you performed the following actions (T is time):
  1. put:            row1, family, col1, value1, T
  2. delete family:  row1, family, T+1
  3. put:            row1, family, col1, value2, T+2
  4. delete columns: row1, family, col1, T+3
  5. put:            row1, family, col1, value3, T+4

What we will find in the StoreFile for "family" is this:
family-delete row1, T+1
row1,col1,value3, T+4
column-delete row1,col1, T+3
row1,col1,value2, T+2
row1,col1,value1, T

KeyValues are ordered in reverse chronological order (within their row and column). The family delete marker, however, is always first on the row. That makes sense, because family delete marker affects potentially many columns in this row, so in order to allow scanners to scan forward-only, the family delete markers need to be seen by a scanner first.

That also means that
  1. even if we are only looking for a specific column, we always seek to the beginning of the row to check if there is a family delete with a timestamp that is greater of equal to the versions of column that we are interested in. After that the scanner seeks to the column.
  2. even if we are looking for a past version of a column we have to seek to the "beginning" of the column (i.e. the potential delete marker right before it), before we can scan forward to the version we're interested in.
My initial patch for HBASE-5268 would sort the prefix delete markers just like column delete markers.

By now it should be obvious why this does not work.

The beginning of a row is a known point, so it the "beginning" of a column. The beginning of a prefix of a column is not. So to find out whether a column is marked for deletion we would have to start at the beginning of the row and then scan forward to find all prefix delete markers. That clearly is not efficient.

My 2nd attempt placed the all prefix delete markers at the beginning of the row. That technically works. But notice that a column delete marker only has to be retained by the scanner for a short period of time (until after we scanned past all versions that it affects). For prefix delete markers we'd have to keep them into memory until we scanned past all columns that start with the prefix. In addition the number of prefix delete markers for a row is not principally limited.
Family delete markers do not have this problem because (1) the number of column families is limited for other reasons and (2) store files are per family, so all we have to remember for a family in a StoreScanner is a timestamp.

Monday, January 23, 2012

HBase intra row scanning

By Lars Hofhansl

Updated (again) Wednesday, January 25th, 2012.

As I painfully worked through HBASE-5229 I realized that HBase already has all the building blocks needed for complex (local) transactions.

What's important here is that (see my introduction to HBase):
  1. HBase ensures atomicity for operations for the same row key
  2. HBase keys have internal structure: (row-key, column family, column, ...)
The missing piece was ColumnRangeFilter. With this filter it is possible to retrieve all columns whose identifier starts with "abc", or all columns whose identifier sorts > "test". For example:

// all columns whose identifier starts with "abc"
Filter f = new ColumnRangeFilter(Bytes.toBytes("abc"), true,
                                 Bytes.toBytes("abd"), false);

// all columns whose identifier sorts after "test"
Filter f = new ColumnRangeFilter(Bytes.toBytes("test"), true,
                                 null, true);

So this allows to search (scan) inside a row by column identifier just  as HBase allows searching by row key.

A client application can exploit this to achieve transactions by grouping all entities that can participate in the same transaction into a single row (and single column family).
Then using prefixes of the column identifiers can be used to define rows inside that group. Basically the search criteria for keys was moved one level down to the column identifier.

Say we wanted to implement a store with transactional tables that contain rows and columns. One way to doing this with HBase as follows:
  • the HBase row-key/column-family maps to a "table"
  • a prefix of the HBase column identifier maps to a "row"
  • the rest of the HBase column identifier identifies the "column"
This is in fact similar to what Google's Megastore (pdf) does.

This leads to potentially wide HBase rows with many columns. The missing piece is allowing a Scan to efficiently retrieve a slice of a wide row.

This where ColumnRangeFilter comes into play. This filter seeks efficiently into the row by seeking ahead to the first HBase block that contains the first KeyValue (or cell) for that column.

Let's model a table "pets" this way. And let's say a pet has a name and a species. The HBase key for entries would look like this:
(table, CF1, rowA|column1) -> value for column1 in rowA
The code would look something like this:
(apologies for the initial incorrect code that I had posted here)

HTable t = ...;
Scan s = ...;
// get all columns for my pet "fluffy".
Filter f = new ColumnRangeFilter(Bytes.toBytes("fluffy"), true,
                                 Bytes.toBytes("fluffz"), false);
s.setBatch(20); // avoid getting all columns for the HBase row
ResultScanner rs = t.getScanner(s);
for (Result r =; r != null; r = {
  // r will now have all HBase columns that start with "fluffy",
  // which would represent a single row
  for (KeyValue kv : r.raw()) {
    // each kv represent - the latest version of - a column

The downside of this is that HBase achieves atomicity by collocating all cells with the same row-key, so it has to be hosted by a single region server.

Tuesday, January 17, 2012

HBase (Intra) Row Transactions

As said in a previous post "HBase ensures that all new versions created by single Put operation for a particular rowkey are either all seen by other clients or seen by none."

Indeed HBase can execute atomic Put operations and atomic Delete operations (as well as a few specialized operations like Increment and Append).

What HBase cannot currently do is to execute a grouping of different operations atomically. For example you cannot execute a Put and Delete operation atomically.

HBASE-3584 and HBASE-5203 change that. It is now possible to group multiple Puts and Deletes for the same row key together as a single atomic operation. The combined operation is atomic even when the executing regionserver fails half way through the operation.

The client facing API looks like this:

HTable t = ...; 
byte[] row = ...;
RowMutation arm = new RowMutation(row);
Put p = new Put(row);
Delete d = new Delete(now);

RowMutation implements the Row interface and can hence itself be part of a multi row batch operation:

HTable t = ...; 
byte[] row1, row2;
RowMutation arm1 = new RowMutation(row1);
RowMutation arm2 = new RowMutation(row2);
List<Row> rows = ...;

But note that this multi row batch is not atomic between different rows.

Wednesday, January 4, 2012

Replication for HA and DR

Disaster Recovery (DR) is an essential part of any storage solution. There are many aspects of this: local backups, remote backups, and cross data center replication.
I will tackle backups in another blog post, here I will write about replication.

Since I prefer to use built-in features rather than rolling our own internal add-ons, over the past few months I have added various features to HBase that I believe are needed for DR with HBase. These are:
  • HBASE-4071 minimum versions with TTL
  • HBASE-4536 option to keep deleted cells and markers
  • HBASE-2195 cyclic replication - master <-> master is a special case of that)
  • HBASE-2196 (replication to multiple slave clusters)
With these in place it is possible to set up slave cluster(s) in other data centers and use them for DR purposes.

Disaster here means:
  • The data center sinks into the ocean.
    Note that replication is asynchronous. If an entire data center - along with all local backups - is lost, there will be a window during which some data is lost.
But once you have a replica it can be used for the following scenarios as well:
  • The customer deleted some data by accident (which means performing and confirming soft deletion and performing and confirming hard deletion afterwards... Still it happens and did happen).
  • The application software deletes some data by accident.

Replication can help as follows:
  • Setup up the main cluster as usual.
  • Setup an identical slave cluster (same tables, same config)
  • Enable replication between the two
This would be a nice way to just allow the slave to take over if the master fails completely. In that case it might be prudent to setup master-master replication between the two.

Or maybe you want be able to restore the state of (say) the past 48 hours without expensive and time consuming restores from (gasp) tape:

  • Setup up the main cluster as usual.
  • Setup a slave in the same way, but set the TTL  for all tables to 48h and minimum versions to 1 (HBASE-4071), along with a reasonably large setting for maximum versions (MAX_INT to be correct).
  • Enable replication.
Now the slave cluster will retain the data for at least 48h, and always keep one version of the data around. If any accidental modification is detected within 48h the data can be recovered (although there is no way to stop the clock).

Or say you wanted to guard against accidental deletes as well.
  • Setup up the main cluster as usual.
  • As before setup TTL, min/max versions in the slave cluster's table.
  • Also enable keep deleted cells (see HBASE-4536) on all slave tables
  • Enable replication.
Now the slave cluster will also keep deleted cells (and all needed delete markers) around for 48h. So that full point-in-time queries are possible to look at the state of a table as of a specific time, guarding against accidental modifications and deletes.

Maybe you want even multiple slave clusters, one to take over in case of failure, and one to guard against accidental deletes, one with many cheap disks, etc.