Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'develop' into py3-xrange-1
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh authored Apr 30, 2018
2 parents d82b6ea + 2fd9672 commit db75c86
Show file tree
Hide file tree
Showing 42 changed files with 161 additions and 121 deletions.
4 changes: 2 additions & 2 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
Expand Down Expand Up @@ -229,7 +229,7 @@ def process_replication_rows(self, stream_name, token, rows):
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
preserve_fn(self.update_token)(token)
run_in_background(self.update_token, token)

# We also need to poke the federation sender when new events happen
elif stream_name == "events":
Expand Down
4 changes: 2 additions & 2 deletions synapse/app/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
Expand Down Expand Up @@ -140,7 +140,7 @@ def __init__(self, hs):

def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
preserve_fn(self.poke_pushers)(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)

@defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows):
Expand Down
5 changes: 2 additions & 3 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
Expand Down Expand Up @@ -327,8 +327,7 @@ def __init__(self, hs):

def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)

preserve_fn(self.process_and_notify)(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)

def get_streams_to_replicate(self):
args = super(SyncReplicationHandler, self).get_streams_to_replicate()
Expand Down
12 changes: 6 additions & 6 deletions synapse/appservice/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from twisted.internet import defer

from synapse.appservice import ApplicationServiceState
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure

import logging
Expand Down Expand Up @@ -106,7 +106,7 @@ def __init__(self, txn_ctrl, clock):
def enqueue(self, service, event):
# if this service isn't being sent something
self.queued_events.setdefault(service.id, []).append(event)
preserve_fn(self._send_request)(service)
run_in_background(self._send_request, service)

@defer.inlineCallbacks
def _send_request(self, service):
Expand Down Expand Up @@ -152,10 +152,10 @@ def send(self, service, events):
if sent:
yield txn.complete(self.store)
else:
preserve_fn(self._start_recoverer)(service)
except Exception as e:
logger.exception(e)
preserve_fn(self._start_recoverer)(service)
run_in_background(self._start_recoverer, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._start_recoverer, service)

@defer.inlineCallbacks
def on_recovered(self, recoverer):
Expand Down
6 changes: 3 additions & 3 deletions synapse/config/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,15 @@ def load_or_generate_config(cls, description, argv):
)
if not cls.path_exists(config_dir_path):
os.makedirs(config_dir_path)
with open(config_path, "wb") as config_file:
config_bytes, config = obj.generate_config(
with open(config_path, "w") as config_file:
config_str, config = obj.generate_config(
config_dir_path=config_dir_path,
server_name=server_name,
report_stats=(config_args.report_stats == "yes"),
is_generating_file=True
)
obj.invoke_all("generate_files", config)
config_file.write(config_bytes)
config_file.write(config_str)
print((
"A config file has been generated in %r for server name"
" %r with corresponding SSL keys and self-signed"
Expand Down
4 changes: 2 additions & 2 deletions synapse/config/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
from synapse.appservice import ApplicationService
from synapse.types import UserID

import urllib
import yaml
import logging

from six import string_types
from six.moves.urllib import parse as urlparse

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -105,7 +105,7 @@ def _load_appservice(hostname, as_info, config_filename):
)

localpart = as_info["sender_localpart"]
if urllib.quote(localpart) != localpart:
if urlparse.quote(localpart) != localpart:
raise ValueError(
"sender_localpart needs characters which are not URL encoded."
)
Expand Down
2 changes: 1 addition & 1 deletion synapse/config/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def generate_files(self, config):
log_config = config.get("log_config")
if log_config and not os.path.exists(log_config):
log_file = self.abspath("homeserver.log")
with open(log_config, "wb") as log_config_file:
with open(log_config, "w") as log_config_file:
log_config_file.write(
DEFAULT_LOG_CONFIG.substitute(log_file=log_file)
)
Expand Down
4 changes: 2 additions & 2 deletions synapse/config/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def generate_files(self, config):
tls_dh_params_path = config["tls_dh_params_path"]

if not self.path_exists(tls_private_key_path):
with open(tls_private_key_path, "w") as private_key_file:
with open(tls_private_key_path, "wb") as private_key_file:
tls_private_key = crypto.PKey()
tls_private_key.generate_key(crypto.TYPE_RSA, 2048)
private_key_pem = crypto.dump_privatekey(
Expand All @@ -148,7 +148,7 @@ def generate_files(self, config):
)

if not self.path_exists(tls_certificate_path):
with open(tls_certificate_path, "w") as certificate_file:
with open(tls_certificate_path, "wb") as certificate_file:
cert = crypto.X509()
subject = cert.get_subject()
subject.CN = config["server_name"]
Expand Down
28 changes: 17 additions & 11 deletions synapse/crypto/keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from synapse.util import unwrapFirstError, logcontext
from synapse.util.logcontext import (
PreserveLoggingContext,
preserve_fn
preserve_fn,
run_in_background,
)
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -127,7 +128,7 @@ def verify_json_objects_for_server(self, server_and_json):

verify_requests.append(verify_request)

preserve_fn(self._start_key_lookups)(verify_requests)
run_in_background(self._start_key_lookups, verify_requests)

# Pass those keys to handle_key_deferred so that the json object
# signatures can be verified
Expand Down Expand Up @@ -316,7 +317,7 @@ def on_err(err):
if not verify_request.deferred.called:
verify_request.deferred.errback(err)

preserve_fn(do_iterations)().addErrback(on_err)
run_in_background(do_iterations).addErrback(on_err)

@defer.inlineCallbacks
def get_keys_from_store(self, server_name_and_key_ids):
Expand All @@ -332,8 +333,9 @@ def get_keys_from_store(self, server_name_and_key_ids):
"""
res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.get_server_verify_keys)(
server_name, key_ids
run_in_background(
self.store.get_server_verify_keys,
server_name, key_ids,
).addCallback(lambda ks, server: (server, ks), server_name)
for server_name, key_ids in server_name_and_key_ids
],
Expand Down Expand Up @@ -361,7 +363,7 @@ def get_key(perspective_name, perspective_keys):

results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(get_key)(p_name, p_keys)
run_in_background(get_key, p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
Expand Down Expand Up @@ -401,7 +403,7 @@ def get_key(server_name, key_ids):

results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(get_key)(server_name, key_ids)
run_in_background(get_key, server_name, key_ids)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
Expand Down Expand Up @@ -484,7 +486,8 @@ def get_server_verify_key_v2_indirect(self, server_names_and_key_ids,

yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store_keys)(
run_in_background(
self.store_keys,
server_name=server_name,
from_server=perspective_name,
verify_keys=response_keys,
Expand Down Expand Up @@ -542,7 +545,8 @@ def get_server_verify_key_v2_direct(self, server_name, key_ids):

yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store_keys)(
run_in_background(
self.store_keys,
server_name=key_server_name,
from_server=server_name,
verify_keys=verify_keys,
Expand Down Expand Up @@ -618,7 +622,8 @@ def process_v2_response(self, from_server, response_json,

yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.store_server_keys_json)(
run_in_background(
self.store.store_server_keys_json,
server_name=server_name,
key_id=key_id,
from_server=server_name,
Expand Down Expand Up @@ -719,7 +724,8 @@ def store_keys(self, server_name, from_server, verify_keys):
# TODO(markjh): Store whether the keys have expired.
return logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.store_server_verify_key)(
run_in_background(
self.store.store_server_verify_key,
server_name, server_name, key.time_added, key
)
for key_id, key in verify_keys.items()
Expand Down
5 changes: 3 additions & 2 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import synapse.metrics
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination

Expand Down Expand Up @@ -419,7 +419,8 @@ def random_server_list():
batch = set(missing_events[i:i + batch_size])

deferreds = [
preserve_fn(self.get_pdu)(
run_in_background(
self.get_pdu,
destinations=random_server_list(),
event_id=e_id,
)
Expand Down
4 changes: 2 additions & 2 deletions synapse/groups/attestations.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

from synapse.api.errors import SynapseError
from synapse.types import get_domain_from_id
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background

from signedjson.sign import sign_json

Expand Down Expand Up @@ -196,4 +196,4 @@ def _renew_attestation(group_id, user_id):
group_id = row["group_id"]
user_id = row["user_id"]

preserve_fn(_renew_attestation)(group_id, user_id)
run_in_background(_renew_attestation, group_id, user_id)
5 changes: 4 additions & 1 deletion synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ def query_3pe(self, kind, protocol, fields):
services = yield self._get_services_for_3pn(protocol)

results = yield make_deferred_yieldable(defer.DeferredList([
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
run_in_background(
self.appservice_api.query_3pe,
service, kind, protocol, fields,
)
for service in services
], consumeErrors=True))

Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
SynapseError, CodeMessageException, FederationDeniedError,
)
from synapse.types import get_domain_from_id, UserID
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.retryutils import NotRetryingDestination

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -139,7 +139,7 @@ def do_remote_query(destination):
failures[destination] = _exception_to_failure(e)

yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(do_remote_query)(destination)
run_in_background(do_remote_query, destination)
for destination in remote_queries_not_in_cache
], consumeErrors=True))

Expand Down Expand Up @@ -242,7 +242,7 @@ def claim_client_keys(destination):
failures[destination] = _exception_to_failure(e)

yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(claim_client_keys)(destination)
run_in_background(claim_client_keys, destination)
for destination in remote_queries
], consumeErrors=True))

Expand Down
16 changes: 10 additions & 6 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,8 @@ def backfill(self, dest, room_id, limit, extremities):

results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.preserve_fn(self.replication_layer.get_pdu)(
logcontext.run_in_background(
self.replication_layer.get_pdu,
[dest],
event_id,
outlier=True,
Expand Down Expand Up @@ -1025,7 +1026,7 @@ def do_invite_join(self, target_hosts, room_id, joinee, content):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.

logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
logcontext.run_in_background(self._handle_queued_pdus, room_queue)

defer.returnValue(True)

Expand Down Expand Up @@ -1527,8 +1528,9 @@ def _handle_new_event(self, origin, event, state=None, auth_events=None,
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
event_stream_id, max_stream_id
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)

defer.returnValue((context, event_stream_id, max_stream_id))
Expand All @@ -1542,7 +1544,8 @@ def _handle_new_events(self, origin, event_infos, backfilled=False):
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.preserve_fn(self._prep_event)(
logcontext.run_in_background(
self._prep_event,
origin,
ev_info["event"],
state=ev_info.get("state"),
Expand Down Expand Up @@ -1871,7 +1874,8 @@ def do_auth(self, origin, event, context, auth_events):

different_events = yield logcontext.make_deferred_yieldable(
defer.gatherResults([
logcontext.preserve_fn(self.store.get_event)(
logcontext.run_in_background(
self.store.get_event,
d,
allow_none=True,
allow_rejected=False,
Expand Down
Loading

0 comments on commit db75c86

Please sign in to comment.