Skip to content

Commit

Permalink
add celery example
Browse files Browse the repository at this point in the history
  • Loading branch information
davidism committed Feb 10, 2023
1 parent dca8cf0 commit 3f19524
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 0 deletions.
7 changes: 7 additions & 0 deletions docs/patterns/celery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Celery itself.
.. _Celery: https://celery.readthedocs.io
.. _First Steps with Celery: https://celery.readthedocs.io/en/latest/getting-started/first-steps-with-celery.html

The Flask repository contains `an example <https://github.com/pallets/flask/tree/main/examples/celery>`_
based on the information on this page, which also shows how to use JavaScript to submit
tasks and poll for progress and results.


Install
-------
Expand Down Expand Up @@ -209,6 +213,9 @@ Now you can start the task using the first route, then poll for the result using
second route. This keeps the Flask request workers from being blocked waiting for tasks
to finish.

The Flask repository contains `an example <https://github.com/pallets/flask/tree/main/examples/celery>`_
using JavaScript to submit tasks and poll for progress and results.


Passing Data to Tasks
---------------------
Expand Down
27 changes: 27 additions & 0 deletions examples/celery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Background Tasks with Celery
============================

This example shows how to configure Celery with Flask, how to set up an API for
submitting tasks and polling results, and how to use that API with JavaScript. See
[Flask's documentation about Celery](https://flask.palletsprojects.com/patterns/celery/).

From this directory, create a virtualenv and install the application into it. Then run a
Celery worker.

```shell
$ python3 -m venv .venv
$ . ./.venv/bin/activate
$ pip install -r requirements.txt && pip install -e .
$ celery -A make_celery worker --loglevel INFO
```

In a separate terminal, activate the virtualenv and run the Flask development server.

```shell
$ . ./.venv/bin/activate
$ flask -A task_app --debug run
```

Go to http://localhost:5000/ and use the forms to submit tasks. You can see the polling
requests in the browser dev tools and the Flask logs. You can see the tasks submitting
and completing in the Celery logs.
4 changes: 4 additions & 0 deletions examples/celery/make_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from task_app import create_app

flask_app = create_app()
celery_app = flask_app.extensions["celery"]
11 changes: 11 additions & 0 deletions examples/celery/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[project]
name = "flask-example-celery"
version = "1.0.0"
description = "Example Flask application with Celery background tasks."
readme = "README.md"
requires-python = ">=3.7"
dependencies = ["flask>=2.2.2", "celery[redis]>=5.2.7"]

[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
56 changes: 56 additions & 0 deletions examples/celery/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile pyproject.toml
#
amqp==5.1.1
# via kombu
async-timeout==4.0.2
# via redis
billiard==3.6.4.0
# via celery
celery[redis]==5.2.7
# via flask-example-celery (pyproject.toml)
click==8.1.3
# via
# celery
# click-didyoumean
# click-plugins
# click-repl
# flask
click-didyoumean==0.3.0
# via celery
click-plugins==1.1.1
# via celery
click-repl==0.2.0
# via celery
flask==2.2.2
# via flask-example-celery (pyproject.toml)
itsdangerous==2.1.2
# via flask
jinja2==3.1.2
# via flask
kombu==5.2.4
# via celery
markupsafe==2.1.2
# via
# jinja2
# werkzeug
prompt-toolkit==3.0.36
# via click-repl
pytz==2022.7.1
# via celery
redis==4.5.1
# via celery
six==1.16.0
# via click-repl
vine==5.0.0
# via
# amqp
# celery
# kombu
wcwidth==0.2.6
# via prompt-toolkit
werkzeug==2.2.2
# via flask
39 changes: 39 additions & 0 deletions examples/celery/src/task_app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from celery import Celery
from celery import Task
from flask import Flask
from flask import render_template


def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)

@app.route("/")
def index() -> str:
return render_template("index.html")

from . import views

app.register_blueprint(views.bp)
return app


def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)

celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
23 changes: 23 additions & 0 deletions examples/celery/src/task_app/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import time

from celery import shared_task
from celery import Task


@shared_task(ignore_result=False)
def add(a: int, b: int) -> int:
return a + b


@shared_task()
def block() -> None:
time.sleep(5)


@shared_task(bind=True, ignore_result=False)
def process(self: Task, total: int) -> object:
for i in range(total):
self.update_state(state="PROGRESS", meta={"current": i + 1, "total": total})
time.sleep(1)

return {"current": total, "total": total}
108 changes: 108 additions & 0 deletions examples/celery/src/task_app/templates/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<!doctype html>
<html>
<head>
<meta charset=UTF-8>
<title>Celery Example</title>
</head>
<body>
<h2>Celery Example</h2>
Execute background tasks with Celery. Submits tasks and shows results using JavaScript.

<hr>
<h4>Add</h4>
<p>Start a task to add two numbers, then poll for the result.
<form id=add method=post action="{{ url_for("tasks.add") }}">
<label>A <input type=number name=a value=4></label><br>
<label>B <input type=number name=b value=2></label><br>
<input type=submit>
</form>
<p>Result: <span id=add-result></span></p>

<hr>
<h4>Block</h4>
<p>Start a task that takes 5 seconds. However, the response will return immediately.
<form id=block method=post action="{{ url_for("tasks.block") }}">
<input type=submit>
</form>
<p id=block-result></p>

<hr>
<h4>Process</h4>
<p>Start a task that counts, waiting one second each time, showing progress.
<form id=process method=post action="{{ url_for("tasks.process") }}">
<label>Total <input type=number name=total value="10"></label><br>
<input type=submit>
</form>
<p id=process-result></p>

<script>
const taskForm = (formName, doPoll, report) => {
document.forms[formName].addEventListener("submit", (event) => {
event.preventDefault()
fetch(event.target.action, {
method: "POST",
body: new FormData(event.target)
})
.then(response => response.json())
.then(data => {
report(null)

const poll = () => {
fetch(`/tasks/result/${data["result_id"]}`)
.then(response => response.json())
.then(data => {
report(data)

if (!data["ready"]) {
setTimeout(poll, 500)
} else if (!data["successful"]) {
console.error(formName, data)
}
})
}

if (doPoll) {
poll()
}
})
})
}

taskForm("add", true, data => {
const el = document.getElementById("add-result")

if (data === null) {
el.innerText = "submitted"
} else if (!data["ready"]) {
el.innerText = "waiting"
} else if (!data["successful"]) {
el.innerText = "error, check console"
} else {
el.innerText = data["value"]
}
})

taskForm("block", false, data => {
document.getElementById("block-result").innerText = (
"request finished, check celery log to see task finish in 5 seconds"
)
})

taskForm("process", true, data => {
const el = document.getElementById("process-result")

if (data === null) {
el.innerText = "submitted"
} else if (!data["ready"]) {
el.innerText = `${data["value"]["current"]} / ${data["value"]["total"]}`
} else if (!data["successful"]) {
el.innerText = "error, check console"
} else {
el.innerText = "✅ done"
}
console.log(data)
})

</script>
</body>
</html>
38 changes: 38 additions & 0 deletions examples/celery/src/task_app/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from celery.result import AsyncResult
from flask import Blueprint
from flask import request

from . import tasks

bp = Blueprint("tasks", __name__, url_prefix="/tasks")


@bp.get("/result/<id>")
def result(id: str) -> dict[str, object]:
result = AsyncResult(id)
ready = result.ready()
return {
"ready": ready,
"successful": result.successful() if ready else None,
"value": result.get() if ready else result.result,
}


@bp.post("/add")
def add() -> dict[str, object]:
a = request.form.get("a", type=int)
b = request.form.get("b", type=int)
result = tasks.add.delay(a, b)
return {"result_id": result.id}


@bp.post("/block")
def block() -> dict[str, object]:
result = tasks.block.delay()
return {"result_id": result.id}


@bp.post("/process")
def process() -> dict[str, object]:
result = tasks.process.delay(total=request.form.get("total", type=int))
return {"result_id": result.id}

0 comments on commit 3f19524

Please sign in to comment.