Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FI-2921: Support SQLAlchemy database result backend #3

Merged
merged 3 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ RUN apk add --no-cache ca-certificates tzdata && update-ca-certificates

# Install the required packages
RUN pip install --no-cache-dir redis flower
RUN pip install --no-cache-dir "SQLAlchemy>=1.4,<2" "psycopg2-binary>=2.9,<3"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love that we're choosing a version of these libraries to support when building the docker image; that seems like a task that should be left up to individual services to do. For example, what if one microservice wants to use SQLAlchemy 2.x.x, but another wants to use SQLAlchemy 1.4.x? Because Flower interacts with the same database as gets written by the microservice's own Celery worker, it seems like the service should be managing the SQLAlchemy/Postgres library versions. If we want to adopt this, then we may need to shift our pattern away from building a final production Flower image, and instead having a "base" Flower image that other repos will extend in order to render their own results. Does that sound right?

In the meantime, I don't think we're deviating too far from existing Flower patterns by putting this here. The base branch of Flower already does pip install ... redis .... This introduces the exact same potential for dependency conflicts as I described above, but for Redis library versions instead of SQLAlchemy library versions.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that sound right?

Doesn't sound right to me. From what I have seen, Flower is designed to run as a standalone application. This means that whatever service celery backend backend happens to be writing to a Postgres database and Flower happens to be reading from that same Postgres database. In this scenario it shouldn't even matter whether or not Flower is using SQLALchemy. The thing that matters is that the schema that Flower is reading from are the same schemas that Celery is writing to.

it seems like the service should be managing the SQLAlchemy/Postgres library versions

It's more like that the service and Flower need to agree on the underlying Postgres schema representation, which likely means they need to coordinate the same version of Celery.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ship

I'm going to cut this comment short:

I want to touch on matrix testing,
certainly i would like to know with confidence that that shipped code is going to work with the known support target versions

this gets into a broader discussion of "how do we organize the manpower required to support a rolling release" though, especially because our customers (other devs) are going to largely be expecting to get the latest versions of libx,liby from pypi, enforced by dependabot even

for now, this is ok as is, maybe a todo comment to make sure we verify it works on versions matching celery latest, stable, oldstable versions?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know its a dead conversation, but the phrase

happens to be writing to a Postgres database and Flower happens to be reading from that same Postgres database

screams red-flag to me, please remind me, why are we not adding api methods to celery that do the database reads? and reading that new api in flower?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we not adding api methods to celery that do the database reads? and reading that new api in flower?

We are using the celery API methods through the Celery result backend.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these points are exactly why this change is a red-flag

Copy link

@phillipuniverse phillipuniverse Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @MaxwellPayne , @dylan-shipwell and I talked about this offline - we're going to move forward here but we need to write up the tradeoffs we're making and have a diagram of how the Flower deployable with these customizations interact with the Celery result backend. I was going to take a first stab at that and the 3 of us can review to make sure we have the risks and issues fully documented and identified and objectively assess as we continue using Celery to ensure the tradeoffs we are making now are still legit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I think in this situation, there is no "right answer." So we should disagree and commit, and write down the reasons why we're making the choices. Do you have a ticket for this that we can reference for posterity?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxwellPayne I created https://shipwell.atlassian.net/browse/SHARE-2822, isn't there an epic you have going somewhere for these celery tasks already?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. 🙏 Just updated the ticket with the relevant epic.


# PYTHONUNBUFFERED: Force stdin, stdout and stderr to be totally unbuffered. (equivalent to `python -u`)
# PYTHONHASHSEED: Enable hash randomization (equivalent to `python -R`)
Expand Down
8 changes: 8 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ services:
image: redis:alpine
ports:
- 6379:6379
postgres:
image: postgres:14-alpine
environment:
POSTGRES_DB: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- 5432:5432
prometheus:
image: prom/prometheus
volumes:
Expand Down
16 changes: 9 additions & 7 deletions flower/utils/results/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import heapq
from collections.abc import Iterator, Iterable
from functools import total_ordering
from typing import Any, Sequence, TypeAlias
from typing import Any, TypeAlias

import dateutil.parser
import kombu.clocks
Expand All @@ -14,12 +14,12 @@ def __init__(
*,
task_id: str,
status: str,
date_done: str,
date_done: str | datetime.datetime,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenience change to __init__() so that different backends can more easily initialize a Result(...) instance despite the backend subclasses representing timestamps using different data types.

result: Any,
traceback: Any,
# fields with default values may be null when Celery's `result_extended=False`
args: Sequence[Any] | None = None,
kwargs: dict[str, Any] | None = None,
args: Any = None,
kwargs: Any = None,
Comment on lines -21 to +22
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, args should always be a Sequence and kwargs should always be a dict (or at least a Mapping). But I ran into some complexities when it comes to deserializing args/kwargs from binary columns from SQLAlchemy records.

Because results are long-lived, it's possible that a result database table has records that, at the time of creation, used different encoding schemes (e.g. JSON, YAML, pickle, custom). Our result data store makes a best-effort attempt to deserialize these binary values, but ultimately it's very possible that some records could completely fail to deserialize. For example, if someone created a task result row using a "custom" serializer that has since been deleted from the user's Celery codebase, it will be impossible for our result store to deserialize.

To avoid 500 errors, our result stores can gracefully handle these failures by simply returning the raw args or kwargs data as it looked when extracted from the data store. Even if we fail to deserialize and you get something ugly like b'some_kwarg : some_value', it's still better than nothing. We don't do much manipulation of args or kwargs here anyway, so we can afford to sacrifice type specificity for the sake of resilience.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed some of this but I need to see it live to really internalize it. But what you're saying makes sense and sounds like you've gone through a good evaluation of the edge cases 👍

name: str | None = None,
# add graceful handling for extra fields that may have been persisted to the result backend record
**kw: Any,
Expand All @@ -30,7 +30,9 @@ def __init__(
"""
self.task_id = task_id
self.status = status
self.date_done: datetime.datetime = dateutil.parser.parse(date_done)
self.date_done: datetime.datetime = (
dateutil.parser.parse(date_done) if isinstance(date_done, str) else date_done
)
self.result = result
self.traceback = traceback
self.name = name
Expand All @@ -56,8 +58,8 @@ def to_render_dict(self) -> dict[str, Any]:
"task_id": self.task_id,
"status": self.status,
"date_done": self.date_done.timestamp(),
"args": repr(self.args),
"kwargs": repr(self.kwargs),
"args": self.args if isinstance(self.args, str) else repr(self.args),
"kwargs": self.kwargs if isinstance(self.kwargs, str) else repr(self.kwargs),
"result": repr(self.result),
"traceback": str(self.traceback),
}
Expand Down
101 changes: 98 additions & 3 deletions flower/utils/results/stores/database.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,108 @@
import json
import pickle
from collections.abc import Iterator
from typing import Any

from celery.backends.database import DatabaseBackend
import kombu.serialization
import sqlalchemy as sa
from celery.backends.database import DatabaseBackend, TaskExtended
from sqlalchemy.orm.session import Session

from flower.utils.results.result import ResultIdWithResultPair
from flower.utils.results.result import ResultIdWithResultPair, Result
from flower.utils.results.stores import AbstractBackendResultsStore


class DatabaseBackendResultsStore(AbstractBackendResultsStore[DatabaseBackend]):
"""
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to this class are the meat of the PR.

Results store capable of reading from Celery's supported `DatabaseBackend`, which uses SQLAlchemy models to persist
tasks in a SQL database.
"""
def results_by_timestamp(self, limit: int | None = None, reverse: bool = True) -> Iterator[
ResultIdWithResultPair
]:
raise NotImplementedError()
query_limit = self.max_tasks_in_memory
if limit is not None and limit < query_limit:
query_limit = limit

session: Session
with self.backend.ResultSession() as session:
ordering = TaskExtended.date_done.desc() if reverse else TaskExtended.date_done.asc()
task_select_query = sa.select(TaskExtended).order_by(ordering).limit(query_limit)
for task in session.execute(task_select_query).scalars():
result = self._map_task_to_result(task)
yield result.task_id, result

def _map_task_to_result(self, task: TaskExtended) -> Result:
"""
Convert a `TaskExtended` ORM object into our shared `Result` data structure. This class assumes the usage of
`TaskExtended` in order to query the "taskmeta" table, since `TaskExtended` queries can successfully return
full data for both tasks that were saved with `result_extended=True` and those that were saved with
`result_extended=False`.

Because we want to support both extended and non-extended tasks, we need a way to figure out whether the
provided task was _actually_ extended or not at the time it was saved. We can do this by looking at the `name`
field. When a task is saved under `result_extended=True`, then it will have a name referencing the name of the
function. Otherwise, that field will be null and we know that it was `result_extended=False`.
Comment on lines +42 to +45

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever... how stable do you think this check is?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general i'd like to advocate against choosing to have the semi-predicate problem, if we can include a bit for is_result_extended, i would much prefer using it, especially when we're in python and have tuples/dicts given to us for free.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's pretty stable. I've dug deep into the Celery implementation, and it seems like when you have extended=True, you get name, args, and kwargs. I figured that name is the best of those to use, since it may be possible for args or kwargs to be null for some reason.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be null for some reason

I assume the obvious would be something like:

@task
def my_function():
    pass

my_function.delay()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, although I'm not sure if it Celery would store NULLs for a no-arg task, or if it would store args=() and kwargs={}. My big fear was that this might vary in different scenarios, so I figured name was a safer choice.

"""
is_actually_extended: bool = task.name is not None
if is_actually_extended:
return Result(
task_id=task.task_id,
status=task.status,
date_done=task.date_done,
result=task.result,
traceback=task.traceback,
args=self.deserialize_binary_column_value(task.args),
kwargs=self.deserialize_binary_column_value(task.kwargs),
name=task.name,
)

return Result(
task_id=task.task_id,
status=task.status,
date_done=task.date_done,
result=task.result,
traceback=task.traceback,
)

def deserialize_binary_column_value(self, value: bytes) -> Any:
"""
Attempt to deserialize the provided `value` using the available serialization decoders. Celery stores task
`args` and `kwargs` in binary columns, but the proper decoding mechanism for those binary columns is not
immediately obvious. These fields get serialized bsed on whatever the value of `Celery.conf.result_serializer`
is at the time the task result is saved. However, it's possible that the value of that config setting changed
across different Celery processes, and therefore we may be dealing with a database that has co-mingled records
from different serializers. Unfortunately, there is no column in the database schema that records which
serializer was used for each task.
Comment on lines +70 to +76
Copy link

@dylan-shipwell dylan-shipwell Sep 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds like somewhere there is missing a version/serial-number of what binary was stored, is there any chance we could add that bincompat monotonic number and fail-out when our current bincompat number is larger than the recalled bincompat numbers?

postgres does something i think is smart and they only support the current and precious bincompat numbers, so as long as you run the service once on every release version, it will upgrade the stored data up one version. LTO uses the same pattern

Did this case get discovered in testing?

aside, again, red-flags all over this.

again semi-predicate problem here, which means we don't actually know what went wrong at runtime, which is another red flag

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point about the semipredicate problem; we would want some way to know that things are going wrong. Perhaps we could append some sort of (Failed to decode) prefix to the return value? That allows the user to know that an error took place, but they also have some visibility into the byte string in case it's useful.

Re: serial number. We're basically at the mercy of pickle, json, or whatever data encoding gets used. We don't control how Celery translates its args/kwargs/results to Postgres binary objects upon save, so it's not easy for us to introduce versioning or any other type of behavior to the database table.


To work around this limitation, this method takes guesses at the serialization of `value` based on whatever
serializers are available in the active `result_accept_content` or `accept_content` Celery config setting.
Each serializer will attempt deserialization, and if one succeeds, we return the deserialized value immediately.
If all deserialization attempts fail, we will gracefully return the original bytes value with a prefix message
explaining that deserialization failed.

TODO: currently this method only attempts deserialization with JSON and pickle. We should support more built-in
content types, and potentially allow for deserialization using custom encodings. We chose to limit the
supported serializers here because JSON and pickle are the only serialization mechanisms available without
additional dependencies (e.g. 'yaml' requires the inclusion of the third-party `yaml` library).
"""
celery_result_accept_content: list[str] = (
self.backend.app.conf.result_accept_content
or self.backend.app.conf.accept_content
)
accept_content_types: list[str] = [item.lower() for item in celery_result_accept_content]

if 'json' in accept_content_types or 'application/json' in accept_content_types:
try:
return kombu.serialization.registry._decoders['application/json'](value)
except (json.JSONDecodeError, UnicodeDecodeError):
pass

if 'pickle' in accept_content_types or 'application/x-python-serialize' in accept_content_types:
try:
return kombu.serialization.registry._decoders['application/x-python-serialize'](value)
except pickle.UnpicklingError:
pass

# couldn't deserialize; just fall back to an error message plus the `repr()` of the original byte string
return 'Failed to deserialize binary value: ' + repr(value)
3 changes: 1 addition & 2 deletions flower/utils/results/stores/redis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
from collections.abc import Iterator
from typing import Any

Expand All @@ -23,7 +22,7 @@ def results_by_timestamp(self, limit: int | None = None, reverse: bool = True) -
for key in self.backend.client.scan_iter(
match=task_key_prefix + ("*" if isinstance(task_key_prefix, str) else b"*")
):
result_data: dict[str, Any] = json.loads(self.backend.client.get(key))
result_data: dict[str, Any] = self.backend.decode_result(self.backend.client.get(key))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes an earlier issue with our Redis result store. Simply invoking the json.loads() method is incomplete, because it will not use the custom decoding logic known to Celery's deserialization methods. So we should instead use something within Celery-land to perform decoding.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, makes sense!

result = Result(**result_data)
heap.push(result)

Expand Down
2 changes: 2 additions & 0 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
-r default.txt
-r test.txt
redis>=4.3.6
SQLAlchemy>=1.4,<2
psycopg2-binary>=2.9,<3
pylint
70 changes: 68 additions & 2 deletions tests/unit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import unittest
from distutils.util import strtobool
from unittest.mock import patch
from urllib.parse import urlencode
from urllib.parse import urlencode, urlparse, urlunparse

import celery
import celery.backends.database.session
import sqlalchemy.schema
import tornado.testing
from celery.backends.database import DatabaseBackend
from celery.exceptions import ImproperlyConfigured
from tornado.ioloop import IOLoop
from tornado.options import options

Expand Down Expand Up @@ -60,3 +63,66 @@ def setUpClass(cls):
'__unittest_skip_why__',
f'Skipping this test case due to the "{skip_backend_tests_env_var}" being true',
)


class DatabaseBackendDependentTestCase(BackendDependentTestCase):
"""
Extension of `BackendDependentTestCase` that sets a default value for `self.app.conf.database_url` based on the
`TEST_DATABASE_CELERY_RESULT_BACKEND_CONNECTION_STRING` environment variable. If no such environment variable
exists, the setup will assume a localhost connection to Postgres.
"""
Comment on lines +69 to +73
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I struggle with how to structure our unit tests here, particularly for test classes that require a live backend data store in order to work properly. The more backends we support, the more dependencies on external processes we will encounter. For example, I could see us needing Postgres, MongoDB, Redis, Memcached, RabbitMQ, and on and on in order to run a comprehensive unit test suite. I know that in other projects, we've used tools like embedded RabbitMQ or embedded Postgres so that our unit test suite can run with complete autonomy and no dependencies on docker containers running those dependencies alongside the test process. But installing those seemed like overkill here, given that upstream Flower currently does not have a solution to that issue.

As long as we're ok with adding a bunch of additional containers to docker-compose.yml, we should be able to run these test successfully. However, I'm not convinced that the upstream Flower project will want to have all those dependencies. Just something to think about for future work.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we're ok with adding a bunch of additional containers to docker-compose.yml, we should be able to run these test successfully. However, I'm not convinced that the upstream Flower project will want to have all those dependencies. Just something to think about for future work.

The more sustainable approach is to use docker-py, here's an example or what I mean.

If you go this route it might be better to look into docker on whales, the docker-py library is kind of a shit show, it languished for literal years. Like a full 12mo with a very high security vulnerability active before they fixed it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is at the heart of why I prefer to say "automated tests" not "unit tests"
i want integration tests to be automated, which often means lots of nasty scripting,
scripting that falls outside of the realm of what unittest.TestCase is good at

a great place to start is by documenting manal steps taken to perform an integration test by hand,

for scripting against docker-compose or docker, docker:dind works totally great, i can pair up and give a demo of that any time, its a bit technical but it makes integrating against docker with predictable state possible.


test_schema_name = 'test_flower'
"""
Name of the DB schema within which we should run tests. This should be separate from the main database schema so
we are safe to create/destroy records at-will throughout the testing lifecycle.
"""

def setUp(self):
super().setUp()
if hasattr(self, 'app'):
if not isinstance(self.app, celery.Celery):
raise ImproperlyConfigured(
'If `self.app` is initialized by another class setUp, it must be an instance of Celery'
)
else:
self.app = celery.Celery()

database_url_parsed = urlparse(
os.environ.get(
'TEST_DATABASE_CELERY_RESULT_BACKEND_CONNECTION_STRING',
'postgresql://postgres:postgres@localhost:5432',
)
)
if '+' in database_url_parsed.scheme:
raise ImproperlyConfigured(
'Should exclude the "+" from Celery database_url scheme and instead only supply the database protocol'
)
self.app.conf.database_url = urlunparse(database_url_parsed)

# restrict creation/deletion of DB models to a separate schema
self.app.conf.database_table_schemas = {
'task': self.test_schema_name,
'group': self.test_schema_name,
}

self.backend = DatabaseBackend(app=self.app)
self._ensure_test_schema()

def _ensure_test_schema(self) -> None:
"""
Create a short-lived session that executes a CREATE SCHEMA statement if the test schema does not yet exist
in the database.
"""
test_schema_name = self.test_schema_name

class CreateSchemaSessionManager(celery.backends.database.session.SessionManager):
def prepare_models(self, engine):
with engine.connect() as conn:
if not conn.dialect.has_schema(conn, test_schema_name):
conn.execute(sqlalchemy.schema.CreateSchema(test_schema_name))
return super().prepare_models(engine)

with self.backend.ResultSession(session_manager=CreateSchemaSessionManager()):
# invoking the context manager will invoke the `prepare_models()` that ensures a schema
pass
Loading