-
Notifications
You must be signed in to change notification settings - Fork 44
HBase Trident
Trident is a high-level abstraction for Storm that allows you to perform stateful stream processing and low latency distributed querying. Trident allows you to perform joins, aggregations, grouping, functions, and filters on streaming data in much the same way as Pig and Cascading allow you to perform these operations in Hadoop. For more information on Trident please see the Trident tutorial.
Trident also allows you to read and write to stateful sources, which can either be internal to the topology (e.g. kept in-memory), or externally stored in a database (See Trident state). The storm-hbase connector provides two generic Trident state implementations to persist state in HBase:
-
HBaseValueState: A Trident state implementation for putting and getting values from a HBase table. Because Trident processes Tuples in batch, and often the state is time critical, HBaseValueState will flush all Puts for a batch to ensure they do not get stuck in HBase's client-side write buffer - although the write buffer should be enabled so the Puts are not sent individually.
-
HBaseAggregateState: A Trident persistentAggregate implementation for updating aggregations in HBase. It expects the Trident topology to group the Tuples by the HBase table's rowKey, columnFamily, and columnQualifier to form the keys in the state, and the aggregation result will form the values (see the HBaseAggregateState example below). As above, all Puts for a batch are flushed immediately to ensure they do not get stuck in HBase's client-side write buffer.
TridentConfig is an extension to TupleTableConfig (as described on the HBase Storm Bolts page) and, in addition to its attributes, the following Trident specific attributes can be set:
- stateCacheSize: Sets the size of the in-memory cache that sits above HBase to minimise the communication between Storm and HBase. Default size is 1,000.
- stateSerializer: The serializer to use for serializing/deserializing the state value in HBase. This does not normally need to be set and will be determined based on the configured state semantics - e.g. non-transactional, transactional, or opaque.
- HBaseTridentValueTopology: A Trident topology that demonstrates how to use HBaseValueState to put streaming data into HBase.
- HBaseTridentAggregateTopology: A Trident topology that demonstrates how to use HBaseAggregateState to compute incremental counts for keys and persist their state in HBase.
The example topologies are based on the URL shortener project (Hush), taken from the HBase: The Definitive Guide book.
Create the HBase table (assumes you have HBase installed and configured):
hbase> create 'shorturl', {NAME => 'data', VERSIONS => 3},{NAME => 'daily', VERSION => 1, TTL => 604800},{NAME => 'weekly', VERSION => 1, TTL => 2678400},{NAME => 'monthly', VERSION => 1, TTL => 31536000}
To build a tarball that includes the storm-hbase .jar and its dependencies for running the examples:
$ mvn assembly:assembly
Copy the .tar.gz to a machine that you typically use to run the HBase client (e.g. has the required hbase-site.xml) and unpack it:
$ scp target/storm-hbase-[version].tar.gz user@hbase.client.machine:/home/user
$ ssh user@hbase.client.machine
$ tar -xvzf storm-hbase-[version].tar.gz
java -cp storm-hbase-[version]/lib/*:/path/to/hbase/conf backtype.storm.contrib.hbase.examples.HBaseTridentValueTopology
The /path/to/hbase/conf directory should contain your hbase-site.xml
With the following TridentConfig configuration:
TridentConfig config = new TridentConfig("shorturl", "shortid");
config.setBatch(false);
config.addColumn("data", "url");
config.addColumn("data", "user");
config.addColumn("data", "date");
Your 'shorturl' table should look something like this:
hbase> scan 'shorturl'
ROW COLUMN+CELL
http://atmlb.com/7NG4sm column=data:date, timestamp=1349709372291, value=20120816
http://atmlb.com/7NG4sm column=data:url, timestamp=1349709372291, value=baltimore.orioles.mlb.com/
http://atmlb.com/7NG4sm column=data:user, timestamp=1349709372291, value=kinley
http://bit.ly/2VL7eA column=data:date, timestamp=1349709372258, value=20120816
http://bit.ly/2VL7eA column=data:url, timestamp=1349709372258, value=www.49ers.com/
http://bit.ly/2VL7eA column=data:user, timestamp=1349709372258, value=kinley
http://bit.ly/9ZJhuY column=data:date, timestamp=1349709372291, value=20120816
http://bit.ly/9ZJhuY column=data:url, timestamp=1349709372291, value=www.buccaneers.com/index.html
http://bit.ly/9ZJhuY column=data:user, timestamp=1349709372291, value=kinley
http://bit.ly/LsaBa column=data:date, timestamp=1349709372258, value=20120816
http://bit.ly/LsaBa column=data:url, timestamp=1349709372258, value=www.baltimoreravens.com/
http://bit.ly/LsaBa column=data:user, timestamp=1349709372258, value=kinley
http://bit.ly/ZK6t column=data:date, timestamp=1349709372258, value=20120816
http://bit.ly/ZK6t column=data:url, timestamp=1349709372258, value=www.arsenal.com/home
http://bit.ly/ZK6t column=data:user, timestamp=1349709372258, value=kinley
The HBaseTridentAggregateTopology demonstrates how to use HBaseAggregateState with Trident's each, project, groupBy, and persistentAggregate functions to calculate daily, weekly, and monthly counters for each key in a stream and use HBase to persist their state.
StateFactory state = HBaseAggregateState.transactional(config);
TridentTopology topology = new TridentTopology();
topology
.newStream("spout", spout)
.each(new Fields("shortid", "date"), new DatePartitionFunction(),
new Fields("cf", "cq"))
.project(new Fields("shortid", "cf", "cq"))
.groupBy(new Fields("shortid", "cf", "cq"))
.persistentAggregate(state, new Count(), new Fields("count"));
Given the example Tuples:
shortid url user date
"http://bit.ly/ZK6t", "www.arsenal.com/home", "kinley", "20120816"
"http://bit.ly/LsaBa", "www.baltimoreravens.com/", "kinley", "20120816"
"http://bit.ly/2VL7eA", "www.49ers.com/", "kinley", "20120816"
"http://bit.ly/9ZJhuY", "www.buccaneers.com/index.html", "kinley", "20120816"
"http://atmlb.com/7NG4sm", "baltimore.orioles.mlb.com/", "kinley", "20120816"
"http://bit.ly/ZK6t", "www.arsenal.com/home", "kinley", "20120817"
"http://bit.ly/ZK6t", "www.arsenal.com/home", "kinley", "20120817"
"http://bit.ly/ZK6t", "www.arsenal.com/home", "kinley", "20120817"
"http://bit.ly/LsaBa", "www.baltimoreravens.com/", "kinley", "20120820"
"http://bit.ly/LsaBa", "www.baltimoreravens.com/", "kinley", "20120820"
"http://bit.ly/LsaBa", "www.baltimoreravens.com/", "kinley", "20120903"
Firstly, because we need to compute daily, weekly, and monthly counters for each key we need to explode the stream so that each period can be grouped and aggregated independently. The each function is used for this because it takes in a single Tuple and emits zero or more Tuples as output. For example, given the Tuple:
"http://bit.ly/ZK6t", "20120816"
The DatePartitionFunction will output the following three Tuples:
"http://bit.ly/ZK6t", "20120816", "daily", "20120816"
"http://bit.ly/ZK6t", "20120816", "weekly", "201233"
"http://bit.ly/ZK6t", "20120816", "monthly", "201208"
The two new fields appended to the Tuple define the column family (CF) and column qualifier (CQ) to use when updating the keys state in HBase.
Next the project function keeps only the fields that are required to compute the counter increments and update the state (e.g. "shortid", "cf", and "cq").
Next the groupBy function groups all Tuples with the same key ("shortid", "cf", and "cq") together so that the counter increment computations are accurate.
Finally the persistentAggregate function uses HBaseAggregateState and the count aggregator to compute the counter increments for each key ("shortid", "cf", and "cq") and updates their state in HBase.
To run the topology:
java -cp storm-hbase-[version]/lib/*:/path/to/hbase/conf backtype.storm.contrib.hbase.examples.HBaseTridentAggregateTopology
The /path/to/hbase/conf directory should contain your hbase-site.xml
Your 'shorturl' table should look something like this:
hbase> scan 'shorturl'
ROW COLUMN+CELL
http://atmlb.com/7NG4sm column=daily:20120816, timestamp=1349716046299, value=[2,1]
http://atmlb.com/7NG4sm column=monthly:201208, timestamp=1349716046300, value=[2,1]
http://atmlb.com/7NG4sm column=weekly:201233, timestamp=1349716046297, value=[2,1]
http://bit.ly/2VL7eA column=daily:20120816, timestamp=1349716046274, value=[1,1]
http://bit.ly/2VL7eA column=monthly:201208, timestamp=1349716046276, value=[1,1]
http://bit.ly/2VL7eA column=weekly:201233, timestamp=1349716046276, value=[1,1]
http://bit.ly/9ZJhuY column=daily:20120816, timestamp=1349716046300, value=[2,1]
http://bit.ly/9ZJhuY column=monthly:201208, timestamp=1349716046300, value=[2,1]
http://bit.ly/9ZJhuY column=weekly:201233, timestamp=1349716046301, value=[2,1]
http://bit.ly/LsaBa column=daily:20120816, timestamp=1349716046277, value=[1,1]
http://bit.ly/LsaBa column=daily:20120820, timestamp=1349716046345, value=[4,2]
http://bit.ly/LsaBa column=daily:20120903, timestamp=1349716046346, value=[4,1]
http://bit.ly/LsaBa column=monthly:201208, timestamp=1349716046354, value=[4,3]
http://bit.ly/LsaBa column=monthly:201209, timestamp=1349716046353, value=[4,1]
http://bit.ly/LsaBa column=weekly:201233, timestamp=1349716046276, value=[1,1]
http://bit.ly/LsaBa column=weekly:201234, timestamp=1349716046353, value=[4,2]
http://bit.ly/LsaBa column=weekly:201236, timestamp=1349716046355, value=[4,1]
http://bit.ly/ZK6t column=daily:20120816, timestamp=1349716046278, value=[1,1]
http://bit.ly/ZK6t column=daily:20120817, timestamp=1349716046319, value=[3,3]
http://bit.ly/ZK6t column=monthly:201208, timestamp=1349716046318, value=[3,4]
http://bit.ly/ZK6t column=weekly:201233, timestamp=1349716046329, value=[3,4]
Where the value is the serialized state of the key, which in this example, includes the latest transaction ID and the count. The transaction ID is used by Trident to guarantee exactly-once processing semantics.