Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use async with for ID gens #8383

Merged
merged 3 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8383.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor ID generators to use `async with` syntax.
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ async def add_account_data_to_room(
"""
content_json = json_encoder.encode(content)

with await self._account_data_id_gen.get_next() as next_id:
async with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as room_account_data has a unique constraint
# on (user_id, room_id, account_data_type) so simple_upsert will
# retry if there is a conflict.
Expand Down Expand Up @@ -387,7 +387,7 @@ async def add_account_data_for_user(
"""
content_json = json_encoder.encode(content)

with await self._account_data_id_gen.get_next() as next_id:
async with self._account_data_id_gen.get_next() as next_id:
# no need to lock here as account_data has a unique constraint on
# (user_id, account_data_type) so simple_upsert will retry if
# there is a conflict.
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def add_messages_txn(txn, now_ms, stream_id):
rows.append((destination, stream_id, now_ms, edu_json))
txn.executemany(sql, rows)

with await self._device_inbox_id_gen.get_next() as stream_id:
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
Expand Down Expand Up @@ -411,7 +411,7 @@ def add_messages_txn(txn, now_ms, stream_id):
txn, stream_id, local_messages_by_user_then_device
)

with await self._device_inbox_id_gen.get_next() as stream_id:
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_from_remote_to_device_inbox",
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ async def add_user_signature_change_to_streams(
THe new stream ID.
"""

with await self._device_list_id_gen.get_next() as stream_id:
async with self._device_list_id_gen.get_next() as stream_id:
await self.db_pool.runInteraction(
"add_user_sig_change_to_streams",
self._add_user_signature_change_txn,
Expand Down Expand Up @@ -1093,7 +1093,7 @@ async def add_device_change_to_streams(
if not device_ids:
return

with await self._device_list_id_gen.get_next_mult(
async with self._device_list_id_gen.get_next_mult(
len(device_ids)
) as stream_ids:
await self.db_pool.runInteraction(
Expand All @@ -1108,7 +1108,7 @@ async def add_device_change_to_streams(
return stream_ids[-1]

context = get_active_span_text_map()
with await self._device_list_id_gen.get_next_mult(
async with self._device_list_id_gen.get_next_mult(
len(hosts) * len(device_ids)
) as stream_ids:
await self.db_pool.runInteraction(
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ async def set_e2e_cross_signing_key(self, user_id, key_type, key):
key (dict): the key data
"""

with await self._cross_signing_id_gen.get_next() as stream_id:
async with self._cross_signing_id_gen.get_next() as stream_id:
return await self.db_pool.runInteraction(
"add_e2e_cross_signing_key",
self._set_e2e_cross_signing_key_txn,
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ async def _persist_events_and_state_updates(
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
if backfilled:
stream_ordering_manager = await self._backfill_id_gen.get_next_mult(
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(events_and_contexts)
)
else:
stream_ordering_manager = await self._stream_id_gen.get_next_mult(
stream_ordering_manager = self._stream_id_gen.get_next_mult(
len(events_and_contexts)
)

with stream_ordering_manager as stream_orderings:
async with stream_ordering_manager as stream_orderings:
for (event, context), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/group_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ def _register_user_group_membership_txn(txn, next_id):

return next_id

with await self._group_updates_id_gen.get_next() as next_id:
async with self._group_updates_id_gen.get_next() as next_id:
res = await self.db_pool.runInteraction(
"register_user_group_membership",
_register_user_group_membership_txn,
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@

class PresenceStore(SQLBaseStore):
async def update_presence(self, presence_states):
stream_ordering_manager = await self._presence_id_gen.get_next_mult(
stream_ordering_manager = self._presence_id_gen.get_next_mult(
len(presence_states)
)

with stream_ordering_manager as stream_orderings:
async with stream_ordering_manager as stream_orderings:
await self.db_pool.runInteraction(
"update_presence",
self._update_presence_txn,
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ async def add_push_rule(
) -> None:
conditions_json = json_encoder.encode(conditions)
actions_json = json_encoder.encode(actions)
with await self._push_rules_stream_id_gen.get_next() as stream_id:
async with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

if before or after:
Expand Down Expand Up @@ -585,7 +585,7 @@ def delete_push_rule_txn(txn, stream_id, event_stream_ordering):
txn, stream_id, event_stream_ordering, user_id, rule_id, op="DELETE"
)

with await self._push_rules_stream_id_gen.get_next() as stream_id:
async with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

await self.db_pool.runInteraction(
Expand Down Expand Up @@ -616,7 +616,7 @@ async def set_push_rule_enabled(
Raises:
NotFoundError if the rule does not exist.
"""
with await self._push_rules_stream_id_gen.get_next() as stream_id:
async with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()
await self.db_pool.runInteraction(
"_set_push_rule_enabled_txn",
Expand Down Expand Up @@ -754,7 +754,7 @@ def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):
data={"actions": actions_json},
)

with await self._push_rules_stream_id_gen.get_next() as stream_id:
async with self._push_rules_stream_id_gen.get_next() as stream_id:
event_stream_ordering = self._stream_id_gen.get_current_token()

await self.db_pool.runInteraction(
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ async def add_pusher(
last_stream_ordering,
profile_tag="",
) -> None:
with await self._pushers_id_gen.get_next() as stream_id:
async with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so simple_upsert will retry
await self.db_pool.simple_upsert(
Expand Down Expand Up @@ -344,7 +344,7 @@ def delete_pusher_txn(txn, stream_id):
},
)

with await self._pushers_id_gen.get_next() as stream_id:
async with self._pushers_id_gen.get_next() as stream_id:
await self.db_pool.runInteraction(
"delete_pusher", delete_pusher_txn, stream_id
)
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ def graph_to_linear(txn):
"insert_receipt_conv", graph_to_linear
)

with await self._receipts_id_gen.get_next() as stream_id:
async with self._receipts_id_gen.get_next() as stream_id:
event_ts = await self.db_pool.runInteraction(
"insert_linearized_receipt",
self.insert_linearized_receipt_txn,
Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/databases/main/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ def store_room_txn(txn, next_id):
},
)

with await self._public_room_id_gen.get_next() as next_id:
async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"store_room_txn", store_room_txn, next_id
)
Expand Down Expand Up @@ -1204,7 +1204,7 @@ def set_room_is_public_txn(txn, next_id):
},
)

with await self._public_room_id_gen.get_next() as next_id:
async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"set_room_is_public", set_room_is_public_txn, next_id
)
Expand Down Expand Up @@ -1284,7 +1284,7 @@ def set_room_is_public_appservice_txn(txn, next_id):
},
)

with await self._public_room_id_gen.get_next() as next_id:
async with self._public_room_id_gen.get_next() as next_id:
await self.db_pool.runInteraction(
"set_room_is_public_appservice",
set_room_is_public_appservice_txn,
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def add_tag_txn(txn, next_id):
)
self._update_revision_txn(txn, user_id, room_id, next_id)

with await self._account_data_id_gen.get_next() as next_id:
async with self._account_data_id_gen.get_next() as next_id:
await self.db_pool.runInteraction("add_tag", add_tag_txn, next_id)

self.get_tags_for_user.invalidate((user_id,))
Expand All @@ -232,7 +232,7 @@ def remove_tag_txn(txn, next_id):
txn.execute(sql, (user_id, room_id, tag))
self._update_revision_txn(txn, user_id, room_id, next_id)

with await self._account_data_id_gen.get_next() as next_id:
async with self._account_data_id_gen.get_next() as next_id:
await self.db_pool.runInteraction("remove_tag", remove_tag_txn, next_id)

self.get_tags_for_user.invalidate((user_id,))
Expand Down
Loading