Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding example showing real-time event handling from API requests #735

Merged
merged 1 commit into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions examples/event-api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Example: Embedded event-processing API

This example shows the use of Kaskada for implementing real-time, low-latency event processing
as part of a service implemented in Python.

## Running

Ensure your environment is setup. Using a virtual environment is reccommended

```sh
pip install -r requirements.txt
```

Next, run the service

```sh
python server.py
```

You should see logs similar to "Waiting for events...".

Now, in a separate terminal/process, you can start sending JSON events to the server.
You'll see aggregated feature values in the response body:

```sh
curl -H "Content-Type: application/json" localhost:8080 -d '{"user": "me"}'
# > {"count": 1, "count_1m": 1}
```
91 changes: 91 additions & 0 deletions examples/event-api/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#!/usr/bin/env python

import json, asyncio, time, uuid, asyncio
import kaskada as kd
from aiohttp import web

async def main():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should make a page in the docs for this? We can either put it in a notebook and have it show up that way, or we can use literalinclude to pull in the code and document it in a page on the docs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could pull this into the docs. I just needed to get it accessible quickly, but we can definitely tweak now that it's available.

kd.init_session()

start = time.time()
requestmap = dict()

# Initialize event source with historical data
events = kd.sources.PyList(
rows = [{"ts": start, "user": "user_1", "request_id": "12345678-1234-5678-1234-567812345678"}],
time_column = "ts",
key_column = "user",
time_unit = "s"
)

# Compute features over events
output = (kd.record({
"response": kd.record({
"count": events.count(),
"count_1m": events.count(window=kd.windows.Since.minutely())
}),
"request_id": events.col("request_id"),
"ts": events.col("ts"),
}))

# Receive JSON messages in real-time
async def handle_http(req: web.Request) -> web.Response:
data = await req.json()

# Add the current time to the event
data["ts"] = time.time()

# Create a future so the aggregated result can be returned in the API response
request_id = str(uuid.uuid4())
requestmap[request_id] = asyncio.Future()
data["request_id"] = request_id

# Send the event to Kaskada to be processed as a stream
print(f"Got data: {data}")
events.add_rows(data)

# Wait for the response to be completed by the Kaskada handler
print(f"Waiting for response")
resp = await requestmap[request_id]

# Return result as the response body
print(f"Sending response: {resp}")
return web.Response(text = json.dumps(resp))

# Setup the async web server
app = web.Application()
app.router.add_post('/', handle_http)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()


# Handle each conversation as it occurs
print(f"Waiting for events...")
async for row in output.run(materialize=True).iter_rows_async():
try:
# Ignore historical rows
if row["ts"] <= start:
continue

print(f"Recieved from K*: {row}")

request_id = row["request_id"]
fut = requestmap.pop(request_id, None)
if fut == None:
print(f"Unrecognized request_id: {request_id}")
continue

fut.set_result(row["response"])

except Exception as e:
print(f"Failed to handle live event from Kaskada: {e}")

# Wait for web server to terminate gracefully
await runner.cleanup()

if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()
Loading