-
Notifications
You must be signed in to change notification settings - Fork 61
Storm
- At least 4 EC2 instances (see AWS Intro)
- Passwordless SSH on each node as described in the Hadoop Intro Dev
- Zookeeper
Storm requires Java. If Java is not installed on all the machines, install with the following command
all-nodes:~$ sudo apt-get update all-nodes:~$ sudo apt-get install openjdk-7-jdk
You can check if the installation was successful by typing the following
all-nodes:~$ which java /usr/bin/java all-nodes:~$ java -version java version "1.7.0.0_79" OpenJDK Runtime Environment (IcedTea 2.5.5) (7u79-2.5.5-0ubuntu0.14.04.2) OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)
This installation process must be executed on the “master” node (which Storm calls Nimbus) AND all the workers.
We will grab the Storm 0.9.5 version and save it to a Downloads folder. Next we will install it into our /usr/local directory and rename the folder to simply ‘storm’
node:~$ wget http://mirrors.gigenet.com/apache/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz -P ~/Downloads node:~$ sudo tar zxvf ~/Downloads/apache-storm*.gz -C /usr/local node:~$ sudo mv /usr/local/apache-storm* /usr/local/storm
Add the following environment variables to the ~/.profile
export STORM_HOME=/usr/local/storm export PATH=$PATH:$STORM_HOME/bin
Be sure to source the profile
node:~$. ~/.profile
Create a local directory on each node for storing state and give it the proper permission
node:~$ sudo mkdir /usr/local/storm/local_state node:~$ sudo chown ubuntu /usr/local/storm/local_state
Edit the /usr/local/storm/conf/storm.yaml file on all the nodes (don’t forget to remove the #’s on the left of the relevant lines)
node:~$ sudo nano /usr/local/storm/conf/storm.yaml
Find, uncomment, and change the bolded text that matches below
########### These MUST be filled in for a storm configuration # place the public dns for all the nodes with zookeeper installed storm.zookeeper.servers: - "<zookeeper-public-dns-1>" - "<zookeeper-public-dns-2>" - "<zookeeper-public-dns-3>" - "<zookeeper-public-dns-4>" # nimbus.host: "<master-public-dns>"
Add the following lines below the previous lines
storm.local.dir: "/usr/local/storm/local_state" # ui.port: 8090 # supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 #
This last set of options tells the supervisor the ports for worker tasks. By default, there are 4 worker tasks running on each worker node, using ports 6700-6703 to communicate. Adding more ports (e.g. 6704) would mean there are more worker tasks launched on that worker machine.
Note: The port of the Storm UI (e.g. 8090) is chosen to avoid collisions with other UIs, like Spark.
Prior to starting Storm, make sure that Zookeeper is running correctly as described in the Zookeeper Dev, which you can check with:
all-nodes:~$ echo srvr | nc localhost 2181
If you don’t see an output, start Zookeeper with:
all-nodes:~$ sudo /usr/local/zookeeper/bin/zkServer.sh start
In the more advanced Dev, we’ll run Storm with supervision, but it’s good to start it manually to ensure that it’s working properly.
Storm uses three types of daemons (background processes) to run networks of computations (known as topologies):
- Nimbus that coordinates work from the “master” node
- Supervisors that process the data on the “worker” nodes
- The UI which runs the WebUI on the “master” node
Start Nimbus on the master node as a background process (or use tmux as shown in the Advanced Kafka Dev)
master_node:~$ sudo $STORM_HOME/bin/storm nimbus &
Start the Storm Supervisor on each worker node as a background process (or use tmux)
worker_node:~$ sudo $STORM_HOME/bin/storm supervisor &
Start the WebUI on the master node as a background process (or use tmux)
master_node:~$ sudo $STORM_HOME/bin/storm ui &
You can go to namenode-public-dns:8090 in your browser to check if it’s working
When you’re done, you can bring these processes to the foreground with the fg
command, and exit them with CTRL+c
The typical way to develop Storm programs (and many other distributed technologies) is to first build a topology locally on a single machine, then deploy that topology to the cluster. This can be done with a variety of tools, but the standard way is to use the Java tool Maven (even if your topology is written in another language like Python). There are also tools, such as Parse.ly’s Streamparser and Yelp’s Pyleus, that build and deploy topologies entirely in Python.
First download the latest version of Maven to a single machine - this could be your local machine or one of the nodes (for this Dev, the directories will correspond to an Ubuntu system). You can get the latest download link from here, but as of writing this it can be download with:
single-machine:~$ wget http://www.gtlib.gatech.edu/pub/apache/maven/maven-3/3.3.3/binaries/apache-maven-3.3.3-bin.tar.gz -P ~/Downloads single-machine:~$ sudo tar zxvf ~/Downloads/apache-maven*.gz -C /usr/local single-machine:~$ sudo mv /usr/local/apache-maven* /usr/local/maven
Add the bin of this directory to your PATH by opening your profile file (e.g. ~/.profile
on Ubuntu, ~/.bash_profile
on OSX) and adding the line
export MAVEN_HOME=/usr/local/maven export PATH=$PATH:$MAVEN_HOME/bin
Be sure to source the appropriate profile file
single-machine:~$ . ~/.profile
Check your installation of Maven with:
single-machine:~$ mvn -v
You should see something like the output below (but specific to your system)
Apache Maven 3.2.5 (12a6b3acb947671f09b81f49094c53f426d8cea1; 2014-12-14T12:29:23-05:00) Maven home: /usr/local/Cellar/maven/3.2.5/libexec Java version: 1.7.0_71, vendor: Oracle Corporation Java home: /Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/jre Default locale: en_US, platform encoding: UTF-8 OS name: "mac os x", version: "10.10.5", arch: "x86_64", family: "mac"
The Storm source code comes with a nice “storm-starter” to run some simple topology examples. Go to the Storm release page and download the source code for the version that matches what’s installed on your cluster. Warning: The latest release on Github is often a newer beta version of Storm, make sure you get the matching, stable version (e.g. 0.9.5)!
single-machine:~$ wget https://github.com/apache/storm/archive/v0.9.5.tar.gz -P ~/Downloads single-machine:~$ tar zxvf ~/Downloads/v0.9.5.tar.gz -C ~/
Maven is pretty simple to use - you define all your programs dependencies using a Project Object Model XML file (pom.xml), and maven will automatically download these dependencies when you build your project. Now build Storm from source on your local machine by running Maven within the Storm top-level directory (this may take a few minutes depending on your internet speed, but you should see a BUILD SUCCESS at the end).
single-machine:~$ cd storm-0.9.5 single-machine:~$ mvn clean install -DskipTests=true
This command cleans any conflicting dependency JARS from the ~/.m2 Maven directory, then installs Storm. As described here, the -D option sets a system property so Storm is built without running tests. You can see the specific dependencies that were built by looking at the pom.xml file.
Now compile and execute a specific topology, the WordCountTopology, using Maven in the storm-starter directory. (note on Windows you’ll need quotes i.e. “-Dstorm...WordCountTopology”)
single-machine:~$ cd examples/storm-starter/ single-machine:~$ mvn compile exec:java -Dstorm.topology=storm.starter.WordCountTopology
This runs WordCount locally with randomly generated words for 10 seconds before shutting down the topology.
To deploy this on your cluster, package up the storm-starter and all the dependencies into an “uberjar” (also known as a “fat jar”).
single-machine:~$ mvn package
This places the compiled jar into the target directory - use scp to transfer this to your nimbus (master) node.
single-machine:~$ scp -i ~/.ssh/ ~/storm-*/examples/storm-starter/target/storm-starter-*-jar-with-dependencies.jar ubuntu@:~/
Your can ssh into your master node and submit an example topology named test-word-count-topology in distributed mode (which Storm calls “remote”). Make sure all the Storm daemons (nimbus, supervisor, and ui) are still running from above, or restart them if necessary.
master:~$ sudo $STORM_HOME/bin/storm jar ~/storm-starter-*.jar storm.starter.WordCountTopology test-word-count-topology remote
If you now visit the Storm WebUI, you’ll see this Topology and can click on it to see various metrics, as well as deactivate, kill, or rebalance it. You can also see the Topology Visualization, with the rough latency of each bolt. See the documentation or Ch. 5.5 (pg. 116) of the Applied Storm book for more details on the WebUI.
You can see that Word Count is done in under 2ms!
You can see the source code for this Topology here. There are many details, which can be learned by looking into the core concepts and/or reading the Storm Applied book in the Dropbox, but the most salient code is the TopologyBuilder part, where the RandomSentenceSpout is connected to the SplitSentence bolt, and then finally connected to the WordCount bolt.
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
The random sentences are sent randomly to different Supervisors with the shuffle grouping, since splitting sentences can be done by any node. However, the actual word count is done using an in-memory hash-map so it’s important that a given word always goes to the same node (e.g. the word “rain” always goes to the same machine so it can be efficiently counted). This is done using a fields grouping, where the field that determines where the tuple is sent is “word”.
The code for the WordCount bolt, which extends the built-in BaseBasicBolt is pretty simple
public static class WordCount extends BaseBasicBolt { Map counts = new HashMap(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
Interestingly enough, the SplitSentence bolt is actually written in Python with the multi-lang feature of Storm. It’s called from the Java with
public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { super("python", "splitsentence.py"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map getComponentConfiguration() { return null; } }
and the logic for splitting on whitespace is contained in splitsentence.py:
import storm class SplitSentenceBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) SplitSentenceBolt().run()
Using this technique, any spout or bolt can be written in almost any language!
For this topology, there is no output bolt so you can’t see the results, but there are pre-written bolts that interact with most major databases. Also, there are pre-written spouts that interact with tools like RabbitMQ and Kafka.
You can kill this topology from the UI or with:
master:~$ sudo $STORM_HOME/bin/storm kill test-word-count-topology
Find out more about the Insight Data Engineering Fellows Program in New York and Silicon Valley, apply today, or sign up for program updates.
You can also read our engineering blog here.