Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3140 from matrix-org/rav/use_run_in_background
Browse files Browse the repository at this point in the history
Use run_in_background in preference to preserve_fn
  • Loading branch information
richvdh authored Apr 29, 2018
2 parents 1315d37 + 453adf0 commit aab2e4d
Show file tree
Hide file tree
Showing 22 changed files with 98 additions and 72 deletions.
4 changes: 2 additions & 2 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
Expand Down Expand Up @@ -229,7 +229,7 @@ def process_replication_rows(self, stream_name, token, rows):
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
preserve_fn(self.update_token)(token)
run_in_background(self.update_token, token)

# We also need to poke the federation sender when new events happen
elif stream_name == "events":
Expand Down
4 changes: 2 additions & 2 deletions synapse/app/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
Expand Down Expand Up @@ -140,7 +140,7 @@ def __init__(self, hs):

def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
preserve_fn(self.poke_pushers)(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)

@defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows):
Expand Down
5 changes: 2 additions & 3 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -327,8 +327,7 @@ def __init__(self, hs):

def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)

preserve_fn(self.process_and_notify)(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)

def get_streams_to_replicate(self):
args = super(SyncReplicationHandler, self).get_streams_to_replicate()
Expand Down
12 changes: 6 additions & 6 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from twisted.internet import defer

from synapse.appservice import ApplicationServiceState
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure

import logging
Expand Down Expand Up @@ -106,7 +106,7 @@ def __init__(self, txn_ctrl, clock):
def enqueue(self, service, event):
# if this service isn't being sent something
self.queued_events.setdefault(service.id, []).append(event)
preserve_fn(self._send_request)(service)
run_in_background(self._send_request, service)

@defer.inlineCallbacks
def _send_request(self, service):
Expand Down Expand Up @@ -152,10 +152,10 @@ def send(self, service, events):
if sent:
yield txn.complete(self.store)
else:
preserve_fn(self._start_recoverer)(service)
except Exception as e:
logger.exception(e)
preserve_fn(self._start_recoverer)(service)
run_in_background(self._start_recoverer, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._start_recoverer, service)

@defer.inlineCallbacks
def on_recovered(self, recoverer):
Expand Down
28 changes: 17 additions & 11 deletions synapse/crypto/keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from synapse.util import unwrapFirstError, logcontext
from synapse.util.logcontext import (
PreserveLoggingContext,
preserve_fn
preserve_fn,
run_in_background,
)
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -127,7 +128,7 @@ def verify_json_objects_for_server(self, server_and_json):

verify_requests.append(verify_request)

preserve_fn(self._start_key_lookups)(verify_requests)
run_in_background(self._start_key_lookups, verify_requests)

# Pass those keys to handle_key_deferred so that the json object
# signatures can be verified
Expand Down Expand Up @@ -316,7 +317,7 @@ def on_err(err):
if not verify_request.deferred.called:
verify_request.deferred.errback(err)

preserve_fn(do_iterations)().addErrback(on_err)
run_in_background(do_iterations).addErrback(on_err)

@defer.inlineCallbacks
def get_keys_from_store(self, server_name_and_key_ids):
Expand All @@ -332,8 +333,9 @@ def get_keys_from_store(self, server_name_and_key_ids):
"""
res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.get_server_verify_keys)(
server_name, key_ids
run_in_background(
self.store.get_server_verify_keys,
server_name, key_ids,
).addCallback(lambda ks, server: (server, ks), server_name)
for server_name, key_ids in server_name_and_key_ids
],
Expand Down Expand Up @@ -361,7 +363,7 @@ def get_key(perspective_name, perspective_keys):

results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(get_key)(p_name, p_keys)
run_in_background(get_key, p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
Expand Down Expand Up @@ -401,7 +403,7 @@ def get_key(server_name, key_ids):

results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(get_key)(server_name, key_ids)
run_in_background(get_key, server_name, key_ids)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
Expand Down Expand Up @@ -484,7 +486,8 @@ def get_server_verify_key_v2_indirect(self, server_names_and_key_ids,

yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store_keys)(
run_in_background(
self.store_keys,
server_name=server_name,
from_server=perspective_name,
verify_keys=response_keys,
Expand Down Expand Up @@ -542,7 +545,8 @@ def get_server_verify_key_v2_direct(self, server_name, key_ids):

yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store_keys)(
run_in_background(
self.store_keys,
server_name=key_server_name,
from_server=server_name,
verify_keys=verify_keys,
Expand Down Expand Up @@ -618,7 +622,8 @@ def process_v2_response(self, from_server, response_json,

yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.store_server_keys_json)(
run_in_background(
self.store.store_server_keys_json,
server_name=server_name,
key_id=key_id,
from_server=server_name,
Expand Down Expand Up @@ -719,7 +724,8 @@ def store_keys(self, server_name, from_server, verify_keys):
# TODO(markjh): Store whether the keys have expired.
return logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.store_server_verify_key)(
run_in_background(
self.store.store_server_verify_key,
server_name, server_name, key.time_added, key
)
for key_id, key in verify_keys.items()
Expand Down
5 changes: 3 additions & 2 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import synapse.metrics
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination

Expand Down Expand Up @@ -417,7 +417,8 @@ def random_server_list():
batch = set(missing_events[i:i + batch_size])

deferreds = [
preserve_fn(self.get_pdu)(
run_in_background(
self.get_pdu,
destinations=random_server_list(),
event_id=e_id,
)
Expand Down
4 changes: 2 additions & 2 deletions synapse/groups/attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

from synapse.api.errors import SynapseError
from synapse.types import get_domain_from_id
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background

from signedjson.sign import sign_json

Expand Down Expand Up @@ -196,4 +196,4 @@ def _renew_attestation(group_id, user_id):
group_id = row["group_id"]
user_id = row["user_id"]

preserve_fn(_renew_attestation)(group_id, user_id)
run_in_background(_renew_attestation, group_id, user_id)
5 changes: 4 additions & 1 deletion synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ def query_3pe(self, kind, protocol, fields):
services = yield self._get_services_for_3pn(protocol)

results = yield make_deferred_yieldable(defer.DeferredList([
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
run_in_background(
self.appservice_api.query_3pe,
service, kind, protocol, fields,
)
for service in services
], consumeErrors=True))

Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
SynapseError, CodeMessageException, FederationDeniedError,
)
from synapse.types import get_domain_from_id, UserID
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.retryutils import NotRetryingDestination

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -139,7 +139,7 @@ def do_remote_query(destination):
failures[destination] = _exception_to_failure(e)

yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(do_remote_query)(destination)
run_in_background(do_remote_query, destination)
for destination in remote_queries_not_in_cache
], consumeErrors=True))

Expand Down Expand Up @@ -242,7 +242,7 @@ def claim_client_keys(destination):
failures[destination] = _exception_to_failure(e)

yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(claim_client_keys)(destination)
run_in_background(claim_client_keys, destination)
for destination in remote_queries
], consumeErrors=True))

Expand Down
16 changes: 10 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,8 @@ def backfill(self, dest, room_id, limit, extremities):

results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.preserve_fn(self.replication_layer.get_pdu)(
logcontext.run_in_background(
self.replication_layer.get_pdu,
[dest],
event_id,
outlier=True,
Expand Down Expand Up @@ -1025,7 +1026,7 @@ def do_invite_join(self, target_hosts, room_id, joinee, content):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.

logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
logcontext.run_in_background(self._handle_queued_pdus, room_queue)

defer.returnValue(True)

Expand Down Expand Up @@ -1527,8 +1528,9 @@ def _handle_new_event(self, origin, event, state=None, auth_events=None,
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
event_stream_id, max_stream_id
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)

defer.returnValue((context, event_stream_id, max_stream_id))
Expand All @@ -1542,7 +1544,8 @@ def _handle_new_events(self, origin, event_infos, backfilled=False):
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.preserve_fn(self._prep_event)(
logcontext.run_in_background(
self._prep_event,
origin,
ev_info["event"],
state=ev_info.get("state"),
Expand Down Expand Up @@ -1871,7 +1874,8 @@ def do_auth(self, origin, event, context, auth_events):

different_events = yield logcontext.make_deferred_yieldable(
defer.gatherResults([
logcontext.preserve_fn(self.store.get_event)(
logcontext.run_in_background(
self.store.get_event,
d,
allow_none=True,
allow_rejected=False,
Expand Down
12 changes: 7 additions & 5 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client

from ._base import BaseHandler
Expand Down Expand Up @@ -166,7 +166,8 @@ def handle_room(event):
(messages, token), current_state = yield make_deferred_yieldable(
defer.gatherResults(
[
preserve_fn(self.store.get_recent_events_for_room)(
run_in_background(
self.store.get_recent_events_for_room,
event.room_id,
limit=limit,
end_token=room_end_token,
Expand Down Expand Up @@ -391,9 +392,10 @@ def get_receipts():

presence, receipts, (messages, token) = yield defer.gatherResults(
[
preserve_fn(get_presence)(),
preserve_fn(get_receipts)(),
preserve_fn(self.store.get_recent_events_for_room)(
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
self.store.get_recent_events_for_room,
room_id,
limit=limit,
end_token=now_token.room_key,
Expand Down
7 changes: 4 additions & 3 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
from synapse.util.logcontext import preserve_fn, run_in_background
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.stringutils import random_string
Expand Down Expand Up @@ -857,7 +857,8 @@ def is_inviter_member_event(e):

# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.pusher_pool.on_new_notifications)(
run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id
)

Expand All @@ -872,7 +873,7 @@ def _notify():
except Exception:
logger.exception("Error notifying about new room event")

preserve_fn(_notify)()
run_in_background(_notify)

if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This
Expand Down
7 changes: 4 additions & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from twisted.internet import defer

from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id
Expand Down Expand Up @@ -97,7 +97,8 @@ def _handle_timeouts(self):
if self.hs.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
preserve_fn(self._push_remote)(
run_in_background(
self._push_remote,
member=member,
typing=True
)
Expand Down Expand Up @@ -196,7 +197,7 @@ def _stopped_typing(self, member):
def _push_update(self, member, typing):
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
preserve_fn(self._push_remote)(member, typing)
run_in_background(self._push_remote, member, typing)

self._push_update_local(
member=member,
Expand Down
Loading

0 comments on commit aab2e4d

Please sign in to comment.