Skip to content

Commit

Permalink
Merge pull request #9 from rsiemens/alpha-2
Browse files Browse the repository at this point in the history
Alpha 2
  • Loading branch information
rsiemens authored Dec 5, 2023
2 parents d91aa89 + b7a3b48 commit ef6353d
Show file tree
Hide file tree
Showing 14 changed files with 1,492 additions and 1,345 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
poetry-version: ["1.1.13"]
poetry-version: ["1.4.2"]
os: [ubuntu-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
7 changes: 1 addition & 6 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,4 @@ API Reference
.. automodule:: squids.serde
:members:
:undoc-members:
:show-inheritance:

.. automodule:: squids.routing
:members:
:undoc-members:
:show-inheritance:
:show-inheritance:
34 changes: 13 additions & 21 deletions docs/guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Application
The :class:`squids.App` serves as the central object for configuration, registering tasks, and creating consumers. It is
also responsible for knowing how to connect to SQS. It takes several arguments, but the ones you will use most often are
``name`` and ``boto_config``, the later of which is optional. ``name`` is a string identifier for your application while
``boto_config`` is a dictionary of configuration values that are passed to `boto3.session.Session.resource <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.resource>`_.
``boto_config`` is a dictionary of configuration values that are passed to `boto3.session.Session.client <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client>`_.

.. code-block:: python
Expand All @@ -22,7 +22,7 @@ tasks, and consuming tasks. Registering a task looks like this:
.. code-block:: python
@app.task("emails")
def email_customer(to_addr, from_addr, body):
def email_customer(to_addr, body):
...
This will register the ``email_customer`` function as a task with the app. The :meth:`.App.task`
Expand All @@ -36,18 +36,18 @@ specified queue using :meth:`.Task.send` or :meth:`.Task.send_job` as demonstrat

.. code-block:: python
email_customer.send("foo@domain.com", "bar@domain.com", "Hello!")
email_customer.send("foo@domain.com", "Hello!")
email_customer.send_job(
args=("foo@domain.com", "bar@domain.com", "Hello!"),
args=("foo@domain.com", "Hello!"),
kwargs={},
options={"DelaySeconds": 5}
)
Calling ``send`` or ``send_job`` will ensure that any arguments and keyword arguments match the
signature of the function. If they don't a ``TypeError`` will be raised. Both ``send`` and ``send_job``
will return a response of the same form, a `SQS.Queue.send_message <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Queue.send_message>`_.
will return a response of the same form, a `SQS.Client.send_message <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/send_message.html>`_.
The difference between ``send`` and ``send_job`` is that ``send_job`` also accepts an ``options``
dict which accepts all the same arguments as `SQS.Queue.send_message <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Queue.send_message>`_
dict which accepts all the same arguments as `SQS.Client.send_message <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/send_message.html>`_
except for the ``MessageBody``.

When you send a task it will, by default, serialize the arguments and keyword arguments using `json <https://docs.python.org/3/library/json.html>`_.
Expand All @@ -59,15 +59,15 @@ You can still run your functions synchronously if you want.

.. code-block:: python
email_customer("foo@domain.com", "bar@domain.com", "Hello!")
email_customer("foo@domain.com", "Hello!")
Doing this will **not** send a task through the SQS queue, but instead simply call the function and
execute it in the calling process like normal.

Consuming Tasks
---------------

Once you have sent a task into an SQS queue you'll likely want run it eventually. To run the task
Once you have sent a task into an SQS queue you'll likely want to run it eventually. To run the task
you need to consume it. We can get a consumer for a queue by calling :meth:`.App.create_consumer`.
``create_consumer`` takes a single argument which is the queue name. Once we have the consumer we
can begin to consume and run our tasks like so:
Expand All @@ -82,11 +82,11 @@ can begin to consume and run our tasks like so:
:meth:`.Consumer.consume` will fetch messages from the ``emails`` SQS queue and run the function
associated with each received message. In our case it'll run the ``email_customer`` function. The
``options`` keyword argument is an optional dict that takes the same values as `SQS.Queue.receive_messages <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Queue.receive_messages>`_.
``options`` keyword argument is an optional dict that takes the same values as `SQS.Client.receive_message <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/receive_message.html>`_.

Often you'll want to be consuming your tasks in another process to keep from blocking your main
program. In those cases you can look at using the ``squids`` :ref:`command line consumer<Command Line Consumer>` tool which makes
this task easy.
program. If you don't want to build your own consumer clients you can look at using the ``squids`` :ref:`command line consumer<Command Line Consumer>`
tool which makes simple consuming of tasks easy.

Application Hooks
-----------------
Expand All @@ -97,7 +97,6 @@ There are a couple of hooks you can register with your application.
- :meth:`.App.post_send` - Runs producer side just after the task is sent to the SQS queue.
- :meth:`.App.pre_task` - Runs consumer side after the message is consumed, but just before the task is run.
- :meth:`.App.post_task` - Runs consumer side after the message is consumed and the task is run.
- :meth:`.App.report_queue_stats` - A callback that the command line consumer calls ocassionally with various queue statistics.

.. code-block:: python
Expand All @@ -117,10 +116,6 @@ There are a couple of hooks you can register with your application.
def after_task(task):
...
@app.report_queue_stats
def report(queue_name, queue_stats):
...
These hooks provide a good opportunity for performing logging or metrics related to the production
and consumption of tasks.

Expand All @@ -134,7 +129,7 @@ scale out your rate of consumption.

.. code-block::
usage: squids [-h] -q QUEUE [-w WORKERS] -a APP [--report-interval REPORT_INTERVAL] [--polling-wait-time {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}] [--visibility-timeout VISIBILITY_TIMEOUT]
usage: squids [-h] -q QUEUE [-w WORKERS] -a APP [--polling-wait-time {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}] [--visibility-timeout VISIBILITY_TIMEOUT]
[--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}]
optional arguments:
Expand All @@ -144,9 +139,6 @@ scale out your rate of consumption.
-w WORKERS, --workers WORKERS
The number of workers to run. Defaults to the number of CPUs in the system
-a APP, --app APP Path to the application class something like package.module:app where app is an instance of squids.App
--report-interval REPORT_INTERVAL
How often to call the report_queue_stats callback with GetQueueAttributes for the queue in seconds. Defaults to 300 (5min). If no report_queue_stats callback has been registered then GetQueueAttributes will not be requested.
The report-interval is an at earliest time. It may take longer depending onthe polling-wait-time.
--polling-wait-time {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}
The WaitTimeSeconds for polling for messages from the queue. Consult the AWS SQS docs on long polling for more information about this setting. https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-
short-and-long-polling.html#sqs-long-polling
Expand All @@ -159,4 +151,4 @@ It works by creating a pool of worker processes. The consumer then passes the ta
by the workers. This allows for increased consumption throughput. The consumer will never consumer
more than 2x the number of workers to prevent feeding tasks faster than the workers can process them.

If you need to increase the consumption rate then you can run the consumer on additonal machines or pods.
If you need to increase the consumption rate then you can run the consumer on additonal machines or pods.
9 changes: 5 additions & 4 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Now we can start building our little application. Open up your editor of choice
app = App("squids-example-app")
@app.task(queue="squids-example")
def download_repo(repo, branch):
with requests.get(
Expand All @@ -30,6 +31,7 @@ Now we can start building our little application. Open up your editor of choice
f.write(chunk)
print(f"Download for {repo} complete")
if __name__ == "__main__":
res = requests.get("https://api.github.com/orgs/django/repos")
for repo in res.json():
Expand All @@ -43,7 +45,7 @@ Let's quickly go over what this does starting at the top.
app = App("squids-example-app")
This creates an instance of our SQuidS application, called ``squids-example-app``, which holds all the tasks we register and knows
how to communicate with SQS. It also takes an optional ``boto_config`` keyword argument which is a dictionary that takes the same values as `boto3.session.Session.resource <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.resource>`_.
how to communicate with SQS. It also takes an optional ``boto_config`` keyword argument which is a dictionary that takes the same values as `boto3.session.Session.client <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client>`_.

.. code-block:: python
Expand Down Expand Up @@ -89,7 +91,7 @@ Let's go ahead and run it. ::
Nice! Our tasks have been sent to the squids-example queue, but now we need a way to consume and
run them. SQuidS includes a command line consumer which you can use to quickly start consuming tasks. ::

$ squids --queue squids-example --app example:app
$ squids --queue squids-example --app example:app

/###### /###### /## /## /######
/##__ ## /##__ ## |__/ | ## /##__ ##
Expand All @@ -105,7 +107,6 @@ run them. SQuidS includes a command line consumer which you can use to quickly s
app = squids-example-app
queue = squids-example
workers = 8
report-interval = 300
polling-wait-time = 5
visibility-timeout = 30
log-level = INFO
Expand All @@ -126,4 +127,4 @@ The consumer will fetch messages from the queue and then send them to worker pro
``download_repo`` function. If you take a look at your directory you should see a bunch of ``zip``
files for all the repos we downloaded.

To stop the consumer hit ``Ctrl+C``.
To stop the consumer hit ``Ctrl+C``.
28 changes: 3 additions & 25 deletions example_tasks.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
import random
import time

from squids import routing
from squids.core import App, Task

app = App("test", boto_config={"endpoint_url": "http://localhost:4566"})


@app.report_queue_stats
def reporter(queue: str, queue_stats: dict):
print(f"Queue stats for {queue}")
print(f"\tAvailable: {queue_stats['ApproximateNumberOfMessages']}")
print(f"\tDelayed: {queue_stats['ApproximateNumberOfMessagesDelayed']}")
print(f"\tIn flight: {queue_stats['ApproximateNumberOfMessagesNotVisible']}")


@app.pre_send
def before_send(queue: str, body: dict):
print("Running before send hook")
print(queue, body)


@app.post_send
def after_send(queue: str, body: dict, response: dict):
print("Running after send hook")
print(queue, body, response)


@app.pre_task
Expand Down Expand Up @@ -73,19 +66,4 @@ def recursive_task(n):
print("I'm the last recursive task!")
else:
print(f"Running recursive task {n}")
recursive_task.send(n - 1)


@app.task(queue=["test", "special"], routing_strategy=routing.broadcast_strategy)
def broadcast_task(msg):
print(msg)


@app.task(queue=["test", "special", "other"]) # default random strategy
def random_queue_task(msg):
print(msg)


@app.task(queue=["test", "special", "other"], routing_strategy=routing.hash_strategy)
def hash_queue_task(msg):
print(msg)
recursive_task.send(n - 1)
Loading

0 comments on commit ef6353d

Please sign in to comment.