Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Convert sending mail to async/await. #7557

Merged
merged 2 commits into from
May 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7557.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert sending mail to async/await.
9 changes: 4 additions & 5 deletions synapse/handlers/identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,7 @@ def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server):

return changed

@defer.inlineCallbacks
def send_threepid_validation(
async def send_threepid_validation(
self,
email_address,
client_secret,
Expand Down Expand Up @@ -319,7 +318,7 @@ def send_threepid_validation(
"""
# Check that this email/client_secret/send_attempt combo is new or
# greater than what we've seen previously
session = yield self.store.get_threepid_validation_session(
session = await self.store.get_threepid_validation_session(
"email", client_secret, address=email_address, validated=False
)

Expand Down Expand Up @@ -353,7 +352,7 @@ def send_threepid_validation(
# Send the mail with the link containing the token, client_secret
# and session_id
try:
yield send_email_func(email_address, token, client_secret, session_id)
await send_email_func(email_address, token, client_secret, session_id)
except Exception:
logger.exception(
"Error sending threepid validation email to %s", email_address
Expand All @@ -364,7 +363,7 @@ def send_threepid_validation(
self.hs.clock.time_msec() + self.hs.config.email_validation_token_lifetime
)

yield self.store.start_or_continue_validation_session(
await self.store.start_or_continue_validation_session(
"email",
email_address,
session_id,
Expand Down
38 changes: 16 additions & 22 deletions synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import logging

from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled

from synapse.metrics.background_process_metrics import run_as_background_process
Expand Down Expand Up @@ -132,8 +131,7 @@ def _resume_processing(self):
self._is_processing = False
self._start_processing()

@defer.inlineCallbacks
def _process(self):
async def _process(self):
# we should never get here if we are already processing
assert not self._is_processing

Expand All @@ -142,7 +140,7 @@ def _process(self):

if self.throttle_params is None:
# this is our first loop: load up the throttle params
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.throttle_params = await self.store.get_throttle_params_by_room(
self.pusher_id
)

Expand All @@ -151,29 +149,28 @@ def _process(self):
while True:
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
await self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
finally:
self._is_processing = False

@defer.inlineCallbacks
def _unsafe_process(self):
async def _unsafe_process(self):
"""
Main logic of the push loop without the wrapper function that sets
up logging, measures and guards against multiple instances of it
being run.
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
fn = self.store.get_unread_push_actions_for_user_in_range_for_email
unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
unprocessed = await fn(self.user_id, start, self.max_stream_ordering)

soonest_due_at = None

if not unprocessed:
yield self.save_last_stream_ordering_and_success(self.max_stream_ordering)
await self.save_last_stream_ordering_and_success(self.max_stream_ordering)
return

for push_action in unprocessed:
Expand Down Expand Up @@ -201,15 +198,15 @@ def _unsafe_process(self):
"throttle_ms": self.get_room_throttle_ms(push_action["room_id"]),
}

yield self.send_notification(unprocessed, reason)
await self.send_notification(unprocessed, reason)

yield self.save_last_stream_ordering_and_success(
await self.save_last_stream_ordering_and_success(
max(ea["stream_ordering"] for ea in unprocessed)
)

# we update the throttle on all the possible unprocessed push actions
for ea in unprocessed:
yield self.sent_notif_update_throttle(ea["room_id"], ea)
await self.sent_notif_update_throttle(ea["room_id"], ea)
break
else:
if soonest_due_at is None or should_notify_at < soonest_due_at:
Expand All @@ -227,14 +224,13 @@ def _unsafe_process(self):
self.seconds_until(soonest_due_at), self.on_timer
)

@defer.inlineCallbacks
def save_last_stream_ordering_and_success(self, last_stream_ordering):
async def save_last_stream_ordering_and_success(self, last_stream_ordering):
if last_stream_ordering is None:
# This happens if we haven't yet processed anything
return

self.last_stream_ordering = last_stream_ordering
pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success(
pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.email,
self.user_id,
Expand Down Expand Up @@ -275,13 +271,12 @@ def room_ready_to_notify_at(self, room_id):
may_send_at = last_sent_ts + throttle_ms
return may_send_at

@defer.inlineCallbacks
def sent_notif_update_throttle(self, room_id, notified_push_action):
async def sent_notif_update_throttle(self, room_id, notified_push_action):
# We have sent a notification, so update the throttle accordingly.
# If the event that triggered the notif happened more than
# THROTTLE_RESET_AFTER_MS after the previous one that triggered a
# notif, we release the throttle. Otherwise, the throttle is increased.
time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
time_of_previous_notifs = await self.store.get_time_of_last_push_action_before(
notified_push_action["stream_ordering"]
)

Expand Down Expand Up @@ -310,14 +305,13 @@ def sent_notif_update_throttle(self, room_id, notified_push_action):
"last_sent_ts": self.clock.time_msec(),
"throttle_ms": new_throttle_ms,
}
yield self.store.set_throttle_params(
await self.store.set_throttle_params(
self.pusher_id, room_id, self.throttle_params[room_id]
)

@defer.inlineCallbacks
def send_notification(self, push_actions, reason):
async def send_notification(self, push_actions, reason):
logger.info("Sending notif email for user %r", self.user_id)

yield self.mailer.send_notification_mail(
await self.mailer.send_notification_mail(
self.app_id, self.user_id, self.email, push_actions, reason
)
Loading