Make sure you have ipython
, aiter
, and nc
installed. In a pinch,
you can use telnet
instead of nc
.
ipython
from aiter.server import start_server_aiter
server, aiter = await start_server_aiter(7777)
async for _ in aiter: print(_)
or
ipython -c "from aiter.server import start_server_aiter; s, a = await start_server_aiter(7777); [print(_) async for _ in a]"
In another terminal, do the following:
nc localhost 7777
then hit control-C. Then do it a few more times.
On each connection, you’ll see an ordered pair (r, w) where r is a StreamReader and w is a StreamWriter.
Now try the following:
from aiter.server import start_server_aiter
server, aiter = await start_server_aiter(7777)
count = 0
async for _ in aiter:
print(_)
count += 1
if count >= 3:
server.close()
This will accept three connections, then close the server.
Try launching it and connecting. After the third connection, the server will simply exit. No need explicitly break from the loop.
It’s easy to extract framed bytes out of a StreamReader and turn them into messages.
Here is code that turns a StreamReader into an aiter of readline messages.
async def stream_reader_to_line_aiter(sr):
while True:
r = await sr.readline()
if len(r) == 0:
break
yield r
It’s pretty easy to see how this could be adapted for more complex message formats, either text or binary.
Now try examples/2-line-server.py
. This accepts a single connection, then accepts messages terminated with a “\n”
character, and echos them. Try connecting with nc
and type a few lines. Then exit with control-C (harsh exit) or
control-D (clean exit). You’ll see that the server exits cleanly, indicating that the line_aiter
completed. The
error and the clean exit code paths are the same.
server => aiter of (StreamReader, StreamWriter)
We have a function that makes the following transformation:
(StreamReader, StreamWriter) => aiter of (message, StreamWriter)
So we see how we can turn
aiter of (StreamReader, StreamWriter) => aiter of (aiter of (message, StreamWriter))
So we have an aiter of aiters. Whenever you see this construct, the thing you want is a join_aiter
.
This turns an aiter of aiters into a single aiter that is a union of the objects coming out of each constituent
aiter.
This gives us a transformation from
aiter of (StreamReader, StreamWriter) => aiter (message, StreamWriter)
Now we see we can write one method to handle streams from all clients at once.
See examples/3-multi-client-server.py
Our main loop fetches events and processes them one at a time. This choice is fairly
arbitrary; processing events could also be considered a transformation that
accepts events and produces a result. This result could be something very simple, such
as None
or a summary of what happened to message on its way through the pipeline.
See examples/4-total-pipeline.py
for an example.
Why bother doing this? This will become clear when we add scaling.
Some events might launch a long or slow-running operation that takes a while to complete. If you look at the task model used in example 4, you’ll see that only one event is handled at a time. Also note that the command “wait” is special, and takes five seconds. Try making two connections and you’ll see that if you “wait” in one client, the other client becomes unresponsive.
This is clearly suboptimal. Luckily, there is an easy fix in map_aiter
:
use multiple workers.
Using map_aiter
has the side-effect that the order of items may change,
since (obviously) fast events can be handled more quickly than slow events.
See examples/5-parallel-processing.py
for an example.