Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Use async with for ID gens (#8383)
Browse files Browse the repository at this point in the history
This will allow us to hit the DB after we've finished using the generated stream ID.
  • Loading branch information
erikjohnston authored Sep 23, 2020
1 parent 916bb9d commit cbabb31
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 105 deletions.
1 change: 1 addition & 0 deletions changelog.d/8383.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor ID generators to use `async with` syntax.
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ async def add_account_data_to_room(
"""
content_json = json_encoder.encode(content)

with await self._account_data_id_gen.get_next() as next_id:
async with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as room_account_data has a unique constraint
# on (user_id, room_id, account_data_type) so simple_upsert will
# retry if there is a conflict.
Expand Down Expand Up @@ -387,7 +387,7 @@ async def add_account_data_for_user(
"""
content_json = json_encoder.encode(content)

with await self._account_data_id_gen.get_next() as next_id:
async with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def add_messages_txn(txn, now_ms, stream_id):
rows.append((destination, stream_id, now_ms, edu_json))
txn.executemany(sql, rows)

with await self._device_inbox_id_gen.get_next() as stream_id:
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
Expand Down Expand Up @@ -411,7 +411,7 @@ def add_messages_txn(txn, now_ms, stream_id):
txn, stream_id, local_messages_by_user_then_device
)

with await self._device_inbox_id_gen.get_next() as stream_id:
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_from_remote_to_device_inbox",
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ async def add_user_signature_change_to_streams(
THe new stream ID.
"""

with await self._device_list_id_gen.get_next() as stream_id:
async with self._device_list_id_gen.get_next() as stream_id:
await self.db_pool.runInteraction(
"add_user_sig_change_to_streams",
self._add_user_signature_change_txn,
Expand Down Expand Up @@ -1093,7 +1093,7 @@ async def add_device_change_to_streams(
if not device_ids:
return

with await self._device_list_id_gen.get_next_mult(
async with self._device_list_id_gen.get_next_mult(
len(device_ids)
) as stream_ids:
await self.db_pool.runInteraction(
Expand All @@ -1108,7 +1108,7 @@ async def add_device_change_to_streams(
return stream_ids[-1]

context = get_active_span_text_map()
with await self._device_list_id_gen.get_next_mult(
async with self._device_list_id_gen.get_next_mult(
len(hosts) * len(device_ids)
) as stream_ids:
await self.db_pool.runInteraction(
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ async def set_e2e_cross_signing_key(self, user_id, key_type, key):
key (dict): the key data
"""

with await self._cross_signing_id_gen.get_next() as stream_id:
async with self._cross_signing_id_gen.get_next() as stream_id:
return await self.db_pool.runInteraction(
"add_e2e_cross_signing_key",
self._set_e2e_cross_signing_key_txn,
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ async def _persist_events_and_state_updates(
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
if backfilled:
stream_ordering_manager = await self._backfill_id_gen.get_next_mult(
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(events_and_contexts)
)
else:
stream_ordering_manager = await self._stream_id_gen.get_next_mult(
stream_ordering_manager = self._stream_id_gen.get_next_mult(
len(events_and_contexts)
)

with stream_ordering_manager as stream_orderings:
async with stream_ordering_manager as stream_orderings:
for (event, context), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/group_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ def _register_user_group_membership_txn(txn, next_id):

return next_id

with await self._group_updates_id_gen.get_next() as next_id:
async with self._group_updates_id_gen.get_next() as next_id:
res = await self.db_pool.runInteraction(
"register_user_group_membership",
_register_user_group_membership_txn,
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@

class PresenceStore(SQLBaseStore):
async def update_presence(self, presence_states):
stream_ordering_manager = await self._presence_id_gen.get_next_mult(
stream_ordering_manager = self._presence_id_gen.get_next_mult(
len(presence_states)
)

with stream_ordering_manager as stream_orderings:
async with stream_ordering_manager as stream_orderings:
await self.db_pool.runInteraction(
"update_presence",
self._update_presence_txn,
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ async def add_push_rule(
) -> None:
conditions_json = json_encoder.encode(conditions)
actions_json = json_encoder.encode(actions)
with await self._push_rules_stream_id_gen.get_next() as stream_id:
async with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

if before or after:
Expand Down Expand Up @@ -585,7 +585,7 @@ def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
txn, stream_id, event_stream_ordering, user_id, rule_id, op="DELETE"
)

with await self._push_rules_stream_id_gen.get_next() as stream_id:
async with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

await self.db_pool.runInteraction(
Expand Down Expand Up @@ -616,7 +616,7 @@ async def set_push_rule_enabled(
Raises:
NotFoundError if the rule does not exist.
"""
with await self._push_rules_stream_id_gen.get_next() as stream_id:
async with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()
await self.db_pool.runInteraction(
"_set_push_rule_enabled_txn",
Expand Down Expand Up @@ -754,7 +754,7 @@ def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):
data={"actions": actions_json},
)

with await self._push_rules_stream_id_gen.get_next() as stream_id:
async with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

await self.db_pool.runInteraction(
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ async def add_pusher(
last_stream_ordering,
profile_tag="",
) -> None:
with await self._pushers_id_gen.get_next() as stream_id:
async with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so simple_upsert will retry
await self.db_pool.simple_upsert(
Expand Down Expand Up @@ -344,7 +344,7 @@ def delete_pusher_txn(txn, stream_id):
},
)

with await self._pushers_id_gen.get_next() as stream_id:
async with self._pushers_id_gen.get_next() as stream_id:
await self.db_pool.runInteraction(
"delete_pusher", delete_pusher_txn, stream_id
)
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def graph_to_linear(txn):
"insert_receipt_conv", graph_to_linear
)

with await self._receipts_id_gen.get_next() as stream_id:
async with self._receipts_id_gen.get_next() as stream_id:
event_ts = await self.db_pool.runInteraction(
"insert_linearized_receipt",
self.insert_linearized_receipt_txn,
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ def store_room_txn(txn, next_id):
},
)

with await self._public_room_id_gen.get_next() as next_id:
async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"store_room_txn", store_room_txn, next_id
)
Expand Down Expand Up @@ -1204,7 +1204,7 @@ def set_room_is_public_txn(txn, next_id):
},
)

with await self._public_room_id_gen.get_next() as next_id:
async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"set_room_is_public", set_room_is_public_txn, next_id
)
Expand Down Expand Up @@ -1284,7 +1284,7 @@ def set_room_is_public_appservice_txn(txn, next_id):
},
)

with await self._public_room_id_gen.get_next() as next_id:
async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"set_room_is_public_appservice",
set_room_is_public_appservice_txn,
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def add_tag_txn(txn, next_id):
)
self._update_revision_txn(txn, user_id, room_id, next_id)

with await self._account_data_id_gen.get_next() as next_id:
async with self._account_data_id_gen.get_next() as next_id:
await self.db_pool.runInteraction("add_tag", add_tag_txn, next_id)

self.get_tags_for_user.invalidate((user_id,))
Expand All @@ -232,7 +232,7 @@ def remove_tag_txn(txn, next_id):
txn.execute(sql, (user_id, room_id, tag))
self._update_revision_txn(txn, user_id, room_id, next_id)

with await self._account_data_id_gen.get_next() as next_id:
async with self._account_data_id_gen.get_next() as next_id:
await self.db_pool.runInteraction("remove_tag", remove_tag_txn, next_id)

self.get_tags_for_user.invalidate((user_id,))
Expand Down
Loading

0 comments on commit cbabb31

Please sign in to comment.