Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor getting replication updates from database. #7636

Merged
merged 13 commits into from
Jun 16, 2020

Conversation

erikjohnston
Copy link
Member

The aim here is to make it easier to reason about when streams are limited and when they're not, by moving the logic into the database functions themselves. This should mean we can kill of db_query_to_update_function function.

You might want to look at each commit individually.

This only does a subset of the streams, and simply merges the functionality from db_query_to_update_function into each streams get update DB function:

def db_query_to_update_function(
query_function: Callable[[Token, Token, int], Awaitable[List[tuple]]]
) -> UpdateFunction:
"""Wraps a db query function which returns a list of rows to make it
suitable for use as an `update_function` for the Stream class
"""
async def update_function(instance_name, from_token, upto_token, limit):
rows = await query_function(from_token, upto_token, limit)
updates = [(row[0], row[1:]) for row in rows]
limited = False
if len(updates) >= limit:
upto_token = updates[-1][0]
limited = True
return updates, upto_token, limited
return update_function

c.f #7340

@erikjohnston erikjohnston requested a review from a team June 4, 2020 15:12
@richvdh richvdh removed the request for review from a team June 5, 2020 14:40
@erikjohnston erikjohnston force-pushed the erikj/refactor_db_queries_repl branch from 40c58c7 to b6e35f2 Compare June 8, 2020 14:52
@erikjohnston erikjohnston requested a review from a team June 8, 2020 16:02
@richvdh richvdh self-assigned this Jun 8, 2020
rows.sort()
return rows[:limit]
return rows[:limit], current_id, False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this need more intelligence?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, for some reason i thought we weren't limiting for this

)
sql = """
SELECT stream_id, user_id
FROM push_rules_stream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we stuffing all those columns into the table if we don't care about them... 😕

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hmm, it does look like they're not used anywhere else either. Though I don't really propose doing anything in this PR.

synapse/replication/tcp/streams/_base.py Show resolved Hide resolved
Comment on lines 1083 to 1085
"""Get updates for backfill replication stream, including all new
backfilled events and events that have gone from being outliers to not.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you document the args and return format of the update functions? (Are the ids inclusive or exclusive?)

I know they are all much the same, but if I'm working on the storage layer, I don't want to have to go digging into how the replication layer works to know what a given function is supposed to do, and not having it written down explicitly is a good way for assumptions to be made and off-by-one errors to get in.

@richvdh richvdh removed their assignment Jun 9, 2020
@erikjohnston erikjohnston linked an issue Jun 9, 2020 that may be closed by this pull request
@@ -282,7 +282,7 @@ async def get_all_typing_updates(
typing = self._room_typing[room_id]
rows.append((serial, (room_id, list(typing))))
rows.sort()
return rows[:limit], current_id, False
return rows[:limit], current_id, len(rows) > limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is doing the wrong thing for the returned token when the limit is hit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am such a crank 🤦

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MAYBE THIS TIME I'VE fIXeD IT?!!!??!1?

synapse/storage/data_stores/main/events_worker.py Outdated Show resolved Hide resolved
synapse/storage/data_stores/main/events_worker.py Outdated Show resolved Hide resolved

Returns:
A tuple consisting of: the updates, the position of the rows
returned up to, and whether we returned fewer rows than exists
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you rephrase "the position of the rows returned up to"? it's somewhat unclear: inconsistent use of "position" and "token", inclusive or exclusive, etc.

"the last token included in the results", maybe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully I've clarified it. Unfortunately, technically, the returned token doesn't have to be the last token included in the results (since current_id could be greater due to a gap)

async def get_all_new_backfill_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get updates for backfill replication stream, including all new
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this docstring is very helpful, but please can all the updated/new storage methods have one, not just this method?

synapse/replication/tcp/streams/_base.py Show resolved Hide resolved
Copy link
Member

@richvdh richvdh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@erikjohnston erikjohnston merged commit f6f7511 into develop Jun 16, 2020
@erikjohnston erikjohnston deleted the erikj/refactor_db_queries_repl branch June 16, 2020 16:10
babolivier pushed a commit that referenced this pull request Sep 1, 2021
* commit 'f6f7511a4':
  Refactor getting replication updates from database. (#7636)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

More AssertionErrors from replication_notifier
2 participants