KillrWeather is a reference application (which we are constantly improving) showing how to easily leverage and integrate Apache Spark, Apache Cassandra, and Apache Kafka for fast, streaming computations in asynchronous Akka event-driven environments. This application focuses on the use case of time series data.
I need fast access to historical data on the fly for predictive modeling with real time data from the stream.
Basic Spark, Kafka, Cassandra Samples
The use of time series data for business analysis is not new. What is new is the ability to collect and analyze massive volumes of data in sequence at extremely high velocity to get the clearest picture to predict and forecast future market changes, user behavior, environmental conditions, resource consumption, health trends and much, much more.
Apache Cassandra is a NoSQL database platform particularly suited for these types of Big Data challenges. Cassandra’s data model is an excellent fit for handling data in sequence regardless of data type or size. When writing data to Cassandra, data is sorted and written sequentially to disk. When retrieving data by row key and then by range, you get a fast and efficient access pattern due to minimal disk seeks – time series data is an excellent fit for this type of pattern. Apache Cassandra allows businesses to identify meaningful characteristics in their time series data as fast as possible to make clear decisions about expected future outcomes.
There are many flavors of time series data. Some can be windowed in the stream, others can not be windowed in the stream because queries are not by time slice but by specific year,month,day,hour. Spark Streaming lets you do both.
- KillrWeather Wiki
- com.datastax.killrweather Spark, Kafka and Cassandra workers
git clone https://github.com/killrweather/killrweather.git
cd killrweather
If this is your first time running SBT, you will be downloading the internet.
cd killrweather
sbt compile
# For IntelliJ users, this creates Intellij project files, but as of
# version 14x you should not need this, just import a new sbt project.
sbt gen-idea
1.Download the latest Cassandra and open the compressed file.
2.Start Cassandra - you may need to prepend with sudo, or chown /var/lib/cassandra. On the command line:
./apache-cassandra-{version}/bin/cassandra -f
3.Run the setup cql scripts to create the schema and populate the weather stations table. On the command line start a cqlsh shell:
cd /path/to/killrweather/data
path/to/apache-cassandra-{version}/bin/cqlsh
-
Download the latest Cassandra and double click the installer.
-
Chose to run the Cassandra automatically during start-up
-
Run the setup cql scripts to create the schema and populate the weather stations table. On the command line start a cqlsh shell:
cd c:/path/to/killrweather c:/pat/to/cassandara/bin/cqlsh
You should see:
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh {latest.version} | Cassandra {latest.version} | CQL spec {latest.version} | Native protocol {latest.version}]
Use HELP for help.
cqlsh>
Run the scripts, then keep the cql shell open querying once the apps are running:
cqlsh> source 'create-timeseries.cql';
cqlsh> source 'load-timeseries.cql';
You will see this in all 3 app shells because log4j has been explicitly taken off the classpath:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
What we are really trying to isolate here is what is happening in the apps with regard to the event stream. You can add log4j locally.
To change any package log levels and see more activity, simply modify
1.Start KillrWeather
cd /path/to/killrweather
sbt app/run
As the KillrWeather
app initializes, you will see Akka Cluster start, Zookeeper and the Kafka servers start.
For all three apps in load-time you see the Akka Cluster node join and start metrics collection. In deployment with multiple nodes of each app this would leverage the health of each node for load balancing as the rest of the cluster nodes join the cluster:
2.Start the Kafka data feed app In a second shell run:
sbt clients/run
You should see:
Multiple main classes detected, select one to run:
[1] com.datastax.killrweather.KafkaDataIngestionApp
[2] com.datastax.killrweather.KillrWeatherClientApp
Select KafkaDataIngestionApp
, and watch the shells for activity. You can stop the data feed or let it keep running.
After a few seconds you should see data by entering this in the cqlsh shell:
cqlsh> select * from isd_weather_data.raw_weather_data;
This confirms that data from the ingestion app has published to Kafka, and that raw data is
streaming from Spark to Cassandra from the KillrWeatherApp
.
cqlsh> select * from isd_weather_data.daily_aggregate_precip;
Unfortunately the precips are mostly 0 in the samples (To Do).
3.Open a third shell and again enter this but select KillrWeatherClientApp
:
sbt clients/run
This api client runs queries against the raw and the aggregated data from the kafka stream. It sends requests (for varying locations and dates/times) and for some, triggers further aggregations in compute time which are also saved to Cassandra:
- current weather
- daily temperatures
- monthly temperatures
- monthly highs and low temperatures
- daily precipitations
- top-k precipitation
Next I will add some forecasting with ML :)
Watch the app and client activity in request response of weather data and aggregation data. Because the querying of the API triggers even further aggregation of data from the originally aggregated daily roll ups, you can now see a new tier of temperature and precipitation aggregation: In the cql shell:
cqlsh> select * from isd_weather_data.daily_aggregate_temperature;
cqlsh> select * from isd_weather_data.daily_aggregate_precip;
- Run the app com.datastax.killrweather.KillrWeatherApp
- Run the kafka data ingestion server com.datastax.killrweather.KafkaDataIngestionApp
- Run the API client com.datastax.killrweather.KillrWeatherClientApp
To close the cql shell:
cqlsh> quit;
DCOS allows you to use predefined and auto-deployed versions of all pieces of Killrweather. The Akka actors are all present inside separate docker containers and deployed with Marathon. The systems are fault-tolerant, and Mesos scheduling allows each separate component to be highly-available.
You will be using:
- A fresh DCOS cluster (or one with enough spare capacity to run the relevant containers)
- The DCOS CLI
- Cassandra
- Kafka
- Spark (Spark streaming)
- SBT
- Code-injected components for Akka, Spark-cassandra, Spark-kafka. These are all compiled in with sbt.
Killrweather on DCOS uses the primitives for Cassandra, Kafka, and Spark. Starting from a fresh DCOS install, please execute the following commands foir wherever you have installed the DCOS CLI:
dcos package install cassandra
dcos package install spark
dcos package install kafka
// after kafka is installed:
dcos kafka add 0
dcos kafka start 0
You should now see all three services spinning up as well as a Kafka broker started up.
You must make several changes to the marathon apps included in the root directory of this project. Please edit the environment variables to point to the ports assigned to Spark and the Kafka broker. For example a spark running on ports 10002 and 10003 and a kafka broker running on port 1025, the relevant configs would look like:
"KAFKA_HOSTS": "broker-0.kafka.mesos:1025"
and
"SPARK_HA_MASTER": "mesos://spark.marathon.mesos:10002"
Note that you can use the Mesos UI to find these ports by looking at each respective task's sandbox.
You must wait for all services to become HEALTHY
in the DCOS GUI. After that, deploy the marathon app that sets up cassandra's tables:
dcos marathon app add ./marathon-tables.json
Once this app is healthy as well from the Marathon UI, you may install the three different Akka apps (described in detail below)
dcos marathon app add ./marathon-app.json
dcos marathon app add ./marathon-client-kafka.json
dcos marathon app add ./marathon-client-analytics.json
You now have a fully running SMACK stack!