Skip to content

enricomarchesin/kafka-data-streaming-example

Repository files navigation

Public Transit Status with Apache Kafka

In this project, I've setup a streaming event pipeline around Apache Kafka and its ecosystem. Using public data from the Chicago Transit Authority, an event pipeline built around Kafka allows us to simulate and display the status of train lines in real time.

The project provides a website to watch trains move from station to station.

Final User Interface

Prerequisites

The following are required to complete this project:

  • Docker
  • Python 3.7
  • Access to a computer with a minimum of 16gb+ RAM and a 4-core CPU to execute the simulation

Description

The Chicago Transit Authority (CTA) has asked us to develop a dashboard displaying system status for its commuters. We have decided to use Kafka and ecosystem tools like REST Proxy and Kafka Connect to accomplish this task.

Our architecture looks like this:

Project Architecture

Step 1: Kafka Producers

The train stations emits some of the events that we need.

The CTA has placed a sensor on each side of every train station that can be programmed to take an action whenever a train arrives at the station.

Step 2: Kafka REST Proxy Producer

Our partners at the CTA have asked that we also send weather readings into Kafka from their weather hardware. Unfortunately, this hardware is old and we cannot use the Python Client Library due to hardware restrictions. Instead, we are going to use HTTP REST to send the data to Kafka from the hardware using Kafka's REST Proxy.

Step 3: Kafka Connect

Finally, we need to extract station information from our PostgreSQL database into Kafka. We've decided to use the Kafka JDBC Source Connector.

Step 4: Faust Stream Processor

We will leverage Faust Stream Processing to transform the raw Stations table that we ingested from Kafka Connect. The raw format from the database has more data than we need, and the line color information is not conveniently configured. To remediate this, we're going to ingest data from our Kafka Connect topic, and transform the data.

Step 5: KSQL Table

Next, we will use KSQL to aggregate turnstile data for each of our stations. Recall that when we produced turnstile data, we simply emitted an event, not a count. What would make this data more useful would be to summarize it by station so that downstream applications always have an up-to-date count.

Step 6: Kafka Consumers

With all of the data in Kafka, our final task is to consume the data in the web server that is going to serve the transit status pages to our commuters.

Documentation

Directory Layout

The project consists of two main directories, producers and consumers.


├── consumers
│   ├── consumer.py
│   ├── faust_stream.py
│   ├── ksql.py
│   ├── models
│   │   ├── lines.py
│   │   ├── line.py
│   │   ├── station.py
│   │   └── weather.py
│   ├── requirements.txt
│   ├── server.py
│   ├── topic_check.py
│   └── templates
│       └── status.html
└── producers
    ├── connector.py
    ├── models
    │   ├── line.py
    │   ├── producer.py
    │   ├── schemas
    │   │   ├── arrival_key.json
    │   │   ├── arrival_value.json
    │   │   ├── turnstile_key.json
    │   │   ├── turnstile_value.json
    │   │   ├── weather_key.json
    │   │   └── weather_value.json
    │   ├── station.py
    │   ├── train.py
    │   ├── turnstile.py
    │   ├── turnstile_hardware.py
    │   └── weather.py
    ├── requirements.txt
    └── simulation.py

Running and Testing

To run the simulation, you must first start up the Kafka ecosystem on their machine utilizing Docker Compose.

%> docker-compose up

Docker compose will take a 3-5 minutes to start, depending on your hardware. Please be patient and wait for the docker-compose logs to slow down or stop before beginning the simulation.

Once docker-compose is ready, the following services will be available:

Service Host URL Docker URL Username Password
Public Transit Status http://localhost:8888 n/a
Landoop Kafka Connect UI http://localhost:8084 http://connect-ui:8084
Landoop Kafka Topics UI http://localhost:8085 http://topics-ui:8085
Landoop Schema Registry UI http://localhost:8086 http://schema-registry-ui:8086
Kafka PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093,PLAINTEXT://localhost:9094 PLAINTEXT://kafka0:9092,PLAINTEXT://kafka1:9093,PLAINTEXT://kafka2:9094
REST Proxy http://localhost:8082 http://rest-proxy:8082/
Schema Registry http://localhost:8081 http://schema-registry:8081/
Kafka Connect http://localhost:8083 http://kafka-connect:8083
KSQL http://localhost:8088 http://ksql:8088
PostgreSQL jdbc:postgresql://localhost:5432/cta jdbc:postgresql://postgres:5432/cta cta_admin chicago

Note that to access these services from your own machine, you will always use the Host URL column.

When configuring services that run within Docker Compose, like Kafka Connect you must use the Docker URL. When you configure the JDBC Source Kafka Connector, for example, you will want to use the value from the Docker URL column.

Running the Simulation

There are two pieces to the simulation, the producer and consumer. As you develop each piece of the code, it is recommended that you only run one piece of the project at a time.

However, when you are ready to verify the end-to-end system prior to submission, it is critical that you open a terminal window for each piece and run them at the same time. If you do not run both the producer and consumer at the same time you will not be able to successfully run the project.

To run the producer:

  1. cd producers
  2. virtualenv venv
  3. . venv/bin/activate
  4. pip install -r requirements.txt
  5. python simulation.py

Once the simulation is running, you may hit Ctrl+C at any time to exit.

To run the Faust Stream Processing Application:

  1. cd consumers
  2. virtualenv venv
  3. . venv/bin/activate
  4. pip install -r requirements.txt
  5. faust -A faust_stream worker -l info

To run the KSQL Creation Script:

  1. cd consumers
  2. virtualenv venv
  3. . venv/bin/activate
  4. pip install -r requirements.txt
  5. python ksql.py

To run the consumer:

  1. cd consumers
  2. virtualenv venv
  3. . venv/bin/activate
  4. pip install -r requirements.txt
  5. python server.py

Releases

No releases published

Packages

No packages published