-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Ensure Celery leverages the Flask-SQLAlchemy session #26186
refactor: Ensure Celery leverages the Flask-SQLAlchemy session #26186
Conversation
e73ec92
to
a9b38d9
Compare
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #26186 +/- ##
==========================================
- Coverage 69.08% 66.84% -2.25%
==========================================
Files 1931 1930 -1
Lines 75351 75294 -57
Branches 8429 8429
==========================================
- Hits 52056 50330 -1726
- Misses 21148 22817 +1669
Partials 2147 2147
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
stats_logger.incr("error_sqllab_unhandled") | ||
query = get_query(query_id, session) | ||
return handle_query_error(ex, query, session) | ||
with override_user(security_manager.find_user(username)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty much the same code as previously without the outer with
block.
@@ -130,28 +123,24 @@ def __init__(self, top_n: int = 5, since: str = "7 days ago") -> None: | |||
self.since = parse_human_datetime(since) if since else None | |||
|
|||
def get_payloads(self) -> list[dict[str, int]]: | |||
payloads = [] | |||
session = db.create_scoped_session() | |||
records = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty much the same code as previously without the try
block.
TaggedObject.object_type == "dashboard", | ||
TaggedObject.tag_id.in_(tag_ids), | ||
) | ||
tags = db.session.query(Tag).filter(Tag.name.in_(self.tags)).all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty much the same code as previously without the try
block.
for active_schedule in active_schedules: | ||
for schedule in cron_schedule_window( | ||
triggered_at, active_schedule.crontab, active_schedule.timezone | ||
active_schedules = ReportScheduleDAO.find_active() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pretty much the same code as previously without the outer with
block.
@john-bodley I think it would be great to label this PR with v4.0 and merge it during the breaking window to reuse the test/stabilization efforts that will occur during that period. |
Per,
as discussed with @michael-s-molina, though this is technically a non-breaking change, it seems prudent (from a safety perspective) to hold off merging this until the v4.0 breaking window. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I'm assuming that removing unnecessary commit
operations is not in the scope of this PR.
21272ca
to
d244d32
Compare
SUMMARY
As part of SIP-99 (specifically SIP-99A and SIP-99B) in order to ensure a consistent "unit of work"—via a single atomic unit—all operations should be associated with the same Flask-SQLAlchemy session.
The Flask-SQLAlchemy extension provides a scoped session (on a per request basis) with the necessary oversight, i.e., the session is closed after the request is complete which aids with connection pool management.
Historically Celery tasks have defined their own scoped session (with the option to use connection pooling) which needed to managed independently which added unnecessary code bloat and complexity and likely violated the "unit of work" construct if operations were leveraging both the Flask-SQLAlchemy and Celery sessions. Per this post it seems like Celery can piggyback off of the Flask-SQLAlchemy session so long as setup/teardown is handled correctly. There is actually an example of this in the official Flask documentation per the Celery with Flask document where it references the use of the Flask-SQLAlchemy session (
db.session
).This PR removes the need for a Celery specific session which is a step towards the goal of having all database operations (outside of the Alembic migrations) handled by the global
db.session
which is a necessary requirement in order for us to achieve the goal of an atomic unit of work.The one wrinkle with this approach is #10819 which explicitly leveraged a
NullPool
(as opposed toQueuePool
—if defined via theSQLALCHEMY_ENGINE_OPTIONS
configuration) to avoid using same session across multiple celery workers, however since we're tearing down the session a per task basis where, per here, it states,i.e., we should no longer be experiencing the connection bleeding issue and thus can leverage the efficiency of connection pooling which standardizing the code.
BEFORE/AFTER SCREENSHOTS OR ANIMATED GIF
TESTING INSTRUCTIONS
CI. Additionally I wasn't able to repo the issue mentioned in #10530.
ADDITIONAL INFORMATION