Skip to content

Commit

Permalink
updated docs for 2 examples (#833)
Browse files Browse the repository at this point in the history
* time_centric
* bluesky

related to #825
  • Loading branch information
epinzur authored Oct 30, 2023
1 parent 89308e1 commit 41551a8
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 361 deletions.
6 changes: 6 additions & 0 deletions python/docs/examples/_metadata.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
execute:
daemon: false
warning: false

filters:
- include-code-files

format:
html:
link-external-icon: true
link-external-newwindow: true

jupyter: python3
10 changes: 10 additions & 0 deletions python/docs/examples/bluesky.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async def main():
# The firehose doesn't (currently) require authentication.
at_client = AsyncFirehoseSubscribeReposClient()

# [start_setup]
# Setup the data source.
# This defintes (most of) the schema of the events we'll receive,
# and tells Kaskada which fields to use for time and initial entity.
Expand Down Expand Up @@ -61,7 +62,9 @@ async def main():
key_column="author",
time_unit="s",
)
# [end_setup]

# [start_incoming]
# Handler for newly-arrived messages from BlueSky.
async def receive_at(message) -> None:
# Extract the contents of the message and bail if it's not a "commit"
Expand Down Expand Up @@ -89,6 +92,9 @@ async def receive_at(message) -> None:
}
)

# [end_incoming]

# [start_result]
# Handler for values emitted by Kaskada.
async def receive_outputs():
# We'll perform a very simple aggregation - key by language and count.
Expand All @@ -98,8 +104,12 @@ async def receive_outputs():
async for row in posts_by_first_lang.count().run_iter(kind="row", mode="live"):
print(f"{row['_key']} has posted {row['result']} times since startup")

# [end_result]

# [start_run]
# Kickoff the two async processes concurrently.
await asyncio.gather(at_client.start(receive_at), receive_outputs())
# [end_run]


# Copied from https://raw.githubusercontent.com/MarshalX/atproto/main/examples/firehose/process_commits.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ You can see the full example in the file [bluesky.py](https://github.com/kaskada
Before we can receive events from Bluesky, we need to create a data source to tell Kaskada how to handle the events.
We'll provide a schema and configure the time and entity fields.

```{.python include="bluesky.py" code-line-numbers="true" start-line=26 end-line=53 dedent=4}
```{.python include="bluesky.py" code-line-numbers="true" start-line=30 end-line=64 dedent=4}
```

## Define the incoming event handler
Expand All @@ -27,7 +27,7 @@ This handler parses the message to find [Commit](https://atproto.com/specs/repos
For each Commit, we'll parse out any [Post](https://atproto.com/blog/create-post#post-record-structure) messages.
Finally we do some schema munging to get the Post into the event format we described when creating the data source.

```{.python include="bluesky.py" code-line-numbers="true" start-line=55 end-line=79 dedent=4}
```{.python include="bluesky.py" code-line-numbers="true" start-line=68 end-line=93 dedent=4}
```

## Construct a real-time query and result handler
Expand All @@ -37,14 +37,14 @@ First we'll use `with_key` to regroup events by language, then we'll apply a sim
Finally, we create a handler for the transformed results - here just printing them out.


```{.python include="bluesky.py" code-line-numbers="true" start-line=81 end-line=89 dedent=4}
```{.python include="bluesky.py" code-line-numbers="true" start-line=98 end-line=105 dedent=4}
```

## Final touches

Now we just need to kick it all off by calling `asyncio.gather` on the two handler coroutines. This kicks off all the async processing.

```{.python include="bluesky.py" code-line-numbers="true" start-line=91 end-line=92 dedent=4}
```{.python include="bluesky.py" code-line-numbers="true" start-line=110 end-line=111 dedent=4}
```

Try running it yourself and playing different transformations!
Expand Down
Loading

0 comments on commit 41551a8

Please sign in to comment.