GigaSpaces And Storm Part 1 – Storm Clouds

Introduction

Real-time processing is becoming very popular, and Storm is a popular open source framework and runtime used by Twitter for processing real-time data streams.  Storm addresses the complexity of running real time streams through a compute cluster by providing an elegant set of abstractions that make it easier to reason about your problem domain by letting you focus on data flows rather than on implementation details.  The framework takes your abstract flow definition and optimizes the deployment characteristics in the cluster to optimize throughput.

This post is part of a series about GigaSpaces and Storm.  This series will span both GigaSpaces products, XAP and Cloudify.  In this post I outline Storm itself for those that may be unfamiliar with it, and then the related Cloudify recipes I’ve created thus far.  The net product is a highly available, elastic, cloud-portable Storm cluster with a single management dashboard.   In a future post I’ll show how we can optimize Storm performance by interfacing it with our in-memory data grid (XAP).  For now, first a brief introduction to Storm, and then a discussion of the recipes. This introduction is not meant be an exhaustive explanation of Storm’s features (which is found at “http://storm-project.net”:http://storm-project.net.

Storm in a Nutshell

Storm is a real time, open source data streaming framework that functions entirely in memory.  It constructs a processing graph that feeds data from an input source through processing nodes.  The processing graph is called a “topology”.  The input data sources are called “spouts”, and the processing nodes are called “bolts”.  The data model consists of tuples.  Tuples flow from Spouts to the bolts, which execute user code. Besides simply being locations where data is transformed or accumulated, bolts can also join streams and branch streams.  Storm is designed to be run on several machines to provided parallelism.  Storm topologies are deployed in a manner somewhat similar to a webapp or a XAP processing unit; a jar file is presented to a deployer which distributes it around the cluster where it is loaded and executed.  A topology runs until it is killed.

Nodes in the diagram do not imply physical or even VM boundaries.  Storm’s execution planning will avoid unnecessary network hops, combining operations in a single node where appropriate.

Data Flow In Storm

Storm processes streams of tuples.  A stream is defined to be an unlimited ordered sequence of tuples, and tuples are one dimensional arrays of values.  The values themselves can be any java primitive type, and can also extend to arbitrary types with the use of custom serializers.

A Super Simple Example Topology

To understand Storm, we’ll start from the simplest possible (meaningful) topology, and grow from there. Using the age old twitter example, imagine a stream of tweets, and a desire to count the number of words each user posts in real time.  If you recall from above, topologies begin by reading tuples from a “spout”.  The simplest kind of spout is an “unreliable” spout, so we’ll consider that here.  An “unreliable” spout is one that can’t replay tuples: it is fire and forget.  Since we’re predicting that our storm infrastructure is reliable (and failures rare), and we’re just counting for statistical reasons, we can tolerate an unreliable spout.  So we implement an unreliable spout that can provide tuples to Storm.  Our tuple will simply be two strings: { tweeter, tweet }. When our topology is deployed, the framework begins polling the Spout for tuples.  Each tuple will be fully processed before the next is read.

So now we have tweets streaming in.  Storm operates by polling, so when it is ready for a new tuple, it calls our spout and asks for one.  If we have no “bolt” defined, nothing else happens.  Since we wanted to track word count per user, we need a bolt.  Our bolt will be fed each tuple by the framework.  The bolt will simply contain a data structure (Map) that stores the word count for each user in memory.  These counts will be updated as tuples arrive.  Then the next tuple is read by the framework and so on.

 

An Improvement : Batching

What if I have a longer pipeline of stream processors/bolts?  It’s rather inefficient to read and process only one tuple at a time.  To overcome this, Storm supports the concept of “batch” spouts.  Batch spouts read batches of tuples.  The batch fails or succeeds as a group.  If the spout can keep up, and failures are rare, the batching strategy will be far more performant than reading one tuple at a time.  At the same time, having unreliable spouts, as we did in the first example, becomes more problematic: where dropping an occassional tuple on a failure is tolerable, maybe losing 1000 when a single one fails might not be.  Thus the demand for reliable spouts grows.  A reliable spout is one that can replay a tuple or batch when a error occurs.

Another Improvement : Persistent State

Storm stores its operational state in Zookeeper, an in-memory cluster coordination service. This is operational state however, not the state of individual tuple batches, or topology specific state.  When we revisit the bolt in the example topology above, it accumulates state in an embedded data structure in memory.  Naturally, this is volatile and probably undesirable in the real world.  Storm provides a capability for reliably using sources of state that are external to Storm itself.  More details on this later.  Typically, this is via a data store (NoSQL) or memory (memcached).  In this way, state information can be queried by external observers of the stream processing.

The Cloudify recipes

I’ve built 3 Cloudify recipes to Cloudify Storm. These recipes don’t address any services external to Storm itself (e.g. persistence).  The services are:

  •  zookeeper – Storm uses Zookeeper to communicate between the “Nimbus”(master) and the ‘Supervisors” (workers), as well as to store its current state.
  • storm-nimbus – The topology execution coordinator for the cluster.  The Nimbus is a singleton in the cluster (i.e. not elastic).  It is stateless however (due to storing state in Zookeeper) and there for can fail and be restarted without consequence even to running jobs.
  • storm-supervisor – The supervisors actually run the topology code.  There can/should be many of these (i.e. elastic).  The parallelism attributes of a given topology are specified in the topology itself.

Zookeeper : Recipe Details

The source for this recipe can be found at github: https://github.com/CloudifySource/cloudify-recipes/tree/master/services/zookeeper.  Zookeeper coodinates activity in the cluster, and provides operational state storage.  An interesting note is that due it’s leader election algorithm, it needs to always have an odd number of instances.  So if you want HA, the fewest you can have is 3.  The recipe is only functional on *nix (only tested on Linux).  It should also be noted that Zookeeper is not elastic by design: there is no way to add nodes at runtime.  There are probably 3 interesting things about the Zookeeper recipe that make it standout: the install, templating, and the monitor.

The install

The Zookeeper install is interesting because Zookeeper requires, as a prerequisite to starting, a file listing all the other Zookeeper nodes in it.  This is interesting, because on a Cloud we have no idea what the hosts/IPs are prior to starting the VM.  The key is to wait for all the planned instances to be available by using the service context.  The important thing to understand is that the context returns the instances as soon as the VM has started: it doesn’t mean that the instance has actually been through its “start” lifecycle phase.  Another little gotcha is that each IP in the Zookeeper host list has to have a number attached, and that number->IP relationship must be the same on every node.  The simple solution was to just sort the list prior to assigning the numbers (1..n).

Templating

This is a handy pattern for creating templates of script and configuration files used in a service.  The Zookeeper service has a templates subdirectory that contains various configuration and script files that require modification prior to service startup.  They use Groovy templating.  The most interesting example is the zoo.cfg file, which contains the list of Zookeeper host IPs mentioned previously:

<% hosts.eachWithIndex { host,i -> %>server.${i+1}=${host}:2222:2223
<% } %>

If you’re familiar with JSP syntax (or ASP for that matter), the above will look familiar.  Text inside <% %> pairs is interpreted as groovy code, and text inside ${} is treated as groovy vars.  Inside the recipe itself, the value of the variable “host” is provided and the template evaluated like so:

1
2
3
4
def binding=[“hosts”:ips,”clientPort”:”${config.clientPort}”]
def zoo = new File(‘templates/zoo.cfg’)
engine = new SimpleTemplateEngine()
template = engine.createTemplate(zoo).make(binding)

The “clientPort” binding is ignored by the template, but that is irrelevant.  The output of this template is a config file with a properly formatted list of IPs, something like:

server.1=10.10.10.10

server.2=10.10.10.11

server.3 .....

The Monitor

The monitor is simple, but a nice example about how Cloudify monitoring should be thought of as simply returning a map of key/value pairs, and not in terms of JMX or any particular technology.  This simplicity is a great strength.  It turns out that Zookeeper will reveal it’s runtime status by opening a socket and writing a string to it.

The output on the socket has interesting info about messaging traffic.  The string that Zookeeper responds to is “stat”, and the monitor is a simple bash script:

1
echo stat |nc 127.0.0.1 $PORT |awk -F : ‘{gsub(/ */,””);if ($1 == “Received” || $1 == “Sent” || $1 == “Outstanding”) print $2}’

Basically, “stat” is echo’d to the Zookeeper port on the service instance.  The resulting response string is fed to awk, where it is filtered.  The resulting list of three numbers is returned to the recipe and a map is created from it:

1
2
3
4
5
6
7
8
def metrics=[]
def process=”${context.serviceDirectory}/stat.sh ${clientPort}”.execute()
process.in.eachLine{line->
metrics.add line
}
[ “Packets Received”:metrics[0],”Packets Sent”:metrics[1],”Outstanding Requests”:metrics[2]]

Storm Nimbus – Recipe Details

The source for this recipe can be found at github: https://github.com/CloudifySource/cloudify-recipes/tree/master/services/storm/storm-nimbusThe Storm Nimbus is basically a superset of the Storm Supervisor recipe, so I’ll just go over a few interesting details of storm-nimbus.

The Install

Like the Zookeeper recipe above, the storm-nimbus uses Groovy templating to modify a configuration file supplied in the “templates” directory.  At the beginning of the install, storm-nimbus (which depends on Zookeeper running) gets the Zookeeper services instances from the service context.  This list is iterated over in the storm.yaml config file:

1
2
3
4
5
storm.zookeeper.servers:
<% zooks.each { zook ->
println ” – “${zook.hostAddress}””
} %>

But the biggest thing going on in the install is an actual build of a native library. Storm depends on the zeromq library for network communication, but it is a native library that requires a build.  Both the storm-nimbus and storm-supervisor recipes perform this build.  This was intended on a catch all approach for making a generic demo, NOT as a recommended approach.  It slows down install greatly.  In the “real” world, please have pre-built binaries in a blog store and download them.

In any case, this build task causes the recipe to only run on Linux distros that support “yum”, which is used to pull down various dependencies such as python, gcc, make, etc..  It also pulls from github and builds jzmq.  If you’re interested, check out the “initnode.sh” script in the recipe.

Monitoring

To monitor the Nimbus, I wrote a java plugin for Cloudify.  Why not Groovy?  Personally, beyond a certain threshold of complexity, I find java development environments (i.e. Eclipse) to be more productive.  Having said that, porting it to Groovy would clean things up greatly.  As it is, the recipe requires several jar files in the usmlib directory to support it (thift, etc…).   The plugin itself is unremarkable: it simply calls the Thrift interface and get the Nimbus configuration and extracts some key metrics.  It is hard coded, and obviously should be configurable to select metrics of interest.

Custom Commands

There a few custom commands that can’t be explained until the XAP integration is discussed in future posts.  These commands deal with running and kill topologies, and are tailored to the XAP integration.  Generic topology management commands could be adapted from them however.

Conclusion

These recipes have been tested on EC2 and HPCS/Openstack.  There is an application recipe under app/storm in the cloudify-recipes repo.  When you run it, give a healthy timeout, since the install runs yum fetches and builds for the native libs. In my next post, I’ll discuss the rationale for combining GigaSpaces XAP with Storm, and share the implementation of components to support it.



Leave a Reply