Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Opentrace device lists (#5853)
Browse files Browse the repository at this point in the history
  • Loading branch information
anoadragon453 committed Feb 25, 2020
2 parents 5a60a53 + a90d16d commit 2b3f54a
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 58 deletions.
1 change: 1 addition & 0 deletions changelog.d/5853.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Opentracing for device list updates.
65 changes: 63 additions & 2 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
HttpResponseException,
RequestSendFailed,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
Expand All @@ -45,6 +46,7 @@ def __init__(self, hs):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()

@trace
@defer.inlineCallbacks
def get_devices_by_user(self, user_id):
"""
Expand All @@ -56,6 +58,7 @@ def get_devices_by_user(self, user_id):
defer.Deferred: list[dict[str, X]]: info on each device
"""

set_tag("user_id", user_id)
device_map = yield self.store.get_devices_by_user(user_id)

ips = yield self.store.get_last_client_ip_by_device(user_id, device_id=None)
Expand All @@ -64,8 +67,10 @@ def get_devices_by_user(self, user_id):
for device in devices:
_update_device_from_client_ips(device, ips)

log_kv(device_map)
return devices

@trace
@defer.inlineCallbacks
def get_device(self, user_id, device_id):
""" Retrieve the given device
Expand All @@ -85,9 +90,14 @@ def get_device(self, user_id, device_id):
raise errors.NotFoundError
ips = yield self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)

set_tag("device", device)
set_tag("ips", ips)

return device

@measure_func("device.get_user_ids_changed")
@trace
@defer.inlineCallbacks
def get_user_ids_changed(self, user_id, from_token):
"""Get list of users that have had the devices updated, or have newly
Expand All @@ -97,6 +107,9 @@ def get_user_ids_changed(self, user_id, from_token):
user_id (str)
from_token (StreamToken)
"""

set_tag("user_id", user_id)
set_tag("from_token", from_token)
now_room_key = yield self.store.get_room_events_max_id()

room_ids = yield self.store.get_rooms_for_user(user_id)
Expand Down Expand Up @@ -148,6 +161,9 @@ def get_user_ids_changed(self, user_id, from_token):
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
log_kv(
{"event": "encountered empty previous state", "room_id": room_id}
)
for key, event_id in iteritems(current_state_ids):
etype, state_key = key
if etype != EventTypes.Member:
Expand Down Expand Up @@ -200,7 +216,11 @@ def get_user_ids_changed(self, user_id, from_token):
possibly_joined = []
possibly_left = []

return {"changed": list(possibly_joined), "left": list(possibly_left)}
result = {"changed": list(possibly_joined), "left": list(possibly_left)}

log_kv(result)

return result


class DeviceHandler(DeviceWorkerHandler):
Expand Down Expand Up @@ -267,6 +287,7 @@ def check_device_registered(

raise errors.StoreError(500, "Couldn't generate a device ID.")

@trace
@defer.inlineCallbacks
def delete_device(self, user_id, device_id):
""" Delete the given device
Expand All @@ -284,6 +305,10 @@ def delete_device(self, user_id, device_id):
except errors.StoreError as e:
if e.code == 404:
# no match
set_tag("error", True)
log_kv(
{"reason": "User doesn't have device id.", "device_id": device_id}
)
pass
else:
raise
Expand All @@ -296,6 +321,7 @@ def delete_device(self, user_id, device_id):

yield self.notify_device_update(user_id, [device_id])

@trace
@defer.inlineCallbacks
def delete_all_devices_for_user(self, user_id, except_device_id=None):
"""Delete all of the user's devices
Expand Down Expand Up @@ -331,6 +357,8 @@ def delete_devices(self, user_id, device_ids):
except errors.StoreError as e:
if e.code == 404:
# no match
set_tag("error", True)
set_tag("reason", "User doesn't have that device id.")
pass
else:
raise
Expand Down Expand Up @@ -371,6 +399,7 @@ def update_device(self, user_id, device_id, content):
else:
raise

@trace
@measure_func("notify_device_update")
@defer.inlineCallbacks
def notify_device_update(self, user_id, device_ids):
Expand All @@ -386,6 +415,8 @@ def notify_device_update(self, user_id, device_ids):
hosts.update(get_domain_from_id(u) for u in users_who_share_room)
hosts.discard(self.server_name)

set_tag("target_hosts", hosts)

position = yield self.store.add_device_change_to_streams(
user_id, device_ids, list(hosts)
)
Expand All @@ -405,6 +436,7 @@ def notify_device_update(self, user_id, device_ids):
)
for host in hosts:
self.federation_sender.send_device_messages(host)
log_kv({"message": "sent device update to host", "host": host})

@defer.inlineCallbacks
def on_federation_query_user_devices(self, user_id):
Expand Down Expand Up @@ -451,12 +483,15 @@ def __init__(self, hs, device_handler):
iterable=True,
)

@trace
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
"""Called on incoming device list update from federation. Responsible
for parsing the EDU and adding to pending updates list.
"""

set_tag("origin", origin)
set_tag("edu_content", edu_content)
user_id = edu_content.pop("user_id")
device_id = edu_content.pop("device_id")
stream_id = str(edu_content.pop("stream_id")) # They may come as ints
Expand All @@ -471,12 +506,30 @@ def incoming_device_list_update(self, origin, edu_content):
device_id,
origin,
)

set_tag("error", True)
log_kv(
{
"message": "Got a device list update edu from a user and "
"device which does not match the origin of the request.",
"user_id": user_id,
"device_id": device_id,
}
)
return

room_ids = yield self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
set_tag("error", True)
log_kv(
{
"message": "Got an update from a user for which "
"we don't share any rooms",
"other user_id": user_id,
}
)
logger.warning(
"Got device list update edu for %r/%r, but don't share a room",
user_id,
Expand Down Expand Up @@ -578,6 +631,7 @@ def user_device_resync(self, user_id):
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
Expand All @@ -594,13 +648,20 @@ def user_device_resync(self, user_id):
# eventually become consistent.
return
except FederationDeniedError as e:
set_tag("error", True)
log_kv({"reason": "FederationDeniedError"})
logger.info(e)
return
except Exception:
except Exception as e:
# TODO: Remember that we are now out of sync and try again
# later
set_tag("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id)
return
log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]

Expand Down
6 changes: 5 additions & 1 deletion synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from synapse.api.errors import SynapseError
from synapse.logging.opentracing import (
get_active_span_text_map,
log_kv,
set_tag,
start_active_span,
whitelisted_homeserver,
Expand Down Expand Up @@ -86,7 +87,8 @@ def on_direct_to_device_edu(self, origin, content):

@defer.inlineCallbacks
def send_device_message(self, sender_user_id, message_type, messages):

set_tag("number_of_messages", len(messages))
set_tag("sender", sender_user_id)
local_messages = {}
remote_messages = {}
for user_id, by_device in messages.items():
Expand Down Expand Up @@ -124,6 +126,7 @@ def send_device_message(self, sender_user_id, message_type, messages):
else None,
}

log_kv({"local_messages": local_messages})
stream_id = yield self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
)
Expand All @@ -132,6 +135,7 @@ def send_device_message(self, sender_user_id, message_type, messages):
"to_device_key", stream_id, users=local_messages.keys()
)

log_kv({"remote_messages": remote_messages})
for destination in remote_messages.keys():
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
Expand Down
70 changes: 17 additions & 53 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ def interesting_function(*args, **kwargs):
return something_usual_and_useful
Operation names can be explicitly set for functions by using
``trace_using_operation_name``
Operation names can be explicitly set for a function by passing the
operation name to ``trace``
.. code-block:: python
from synapse.logging.opentracing import trace_using_operation_name
from synapse.logging.opentracing import trace
@trace_using_operation_name("A *much* better operation name")
@trace(opname="a_better_operation_name")
def interesting_badly_named_function(*args, **kwargs):
# Does all kinds of cool and expected things
return something_usual_and_useful
Expand Down Expand Up @@ -641,66 +641,26 @@ def extract_text_map(carrier):
# Tracing decorators


def trace(func):
def trace(func=None, opname=None):
"""
Decorator to trace a function.
Sets the operation name to that of the function's.
Sets the operation name to that of the function's or that given
as operation_name. See the module's doc string for usage
examples.
"""
if opentracing is None:
return func

@wraps(func)
def _trace_inner(self, *args, **kwargs):
if opentracing is None:
return func(self, *args, **kwargs)

scope = start_active_span(func.__name__)
scope.__enter__()

try:
result = func(self, *args, **kwargs)
if isinstance(result, defer.Deferred):

def call_back(result):
scope.__exit__(None, None, None)
return result

def err_back(result):
scope.span.set_tag(tags.ERROR, True)
scope.__exit__(None, None, None)
return result

result.addCallbacks(call_back, err_back)

else:
scope.__exit__(None, None, None)

return result

except Exception as e:
scope.__exit__(type(e), None, e.__traceback__)
raise

return _trace_inner


def trace_using_operation_name(operation_name):
"""Decorator to trace a function. Explicitely sets the operation_name."""

def trace(func):
"""
Decorator to trace a function.
Sets the operation name to that of the function's.
"""
def decorator(func):
if opentracing is None:
return func

_opname = opname if opname else func.__name__

@wraps(func)
def _trace_inner(self, *args, **kwargs):
if opentracing is None:
return func(self, *args, **kwargs)

scope = start_active_span(operation_name)
scope = start_active_span(_opname)
scope.__enter__()

try:
Expand All @@ -717,6 +677,7 @@ def err_back(result):
return result

result.addCallbacks(call_back, err_back)

else:
scope.__exit__(None, None, None)

Expand All @@ -728,7 +689,10 @@ def err_back(result):

return _trace_inner

return trace
if func:
return decorator(func)
else:
return decorator


def tag_args(func):
Expand Down
4 changes: 2 additions & 2 deletions synapse/rest/client/v2_alpha/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
parse_json_object_from_request,
parse_string,
)
from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.types import StreamToken

from ._base import client_patterns
Expand Down Expand Up @@ -69,7 +69,7 @@ def __init__(self, hs):
self.auth = hs.get_auth()
self.e2e_keys_handler = hs.get_e2e_keys_handler()

@trace_using_operation_name("upload_keys")
@trace(opname="upload_keys")
@defer.inlineCallbacks
def on_POST(self, request, device_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
Expand Down
Loading

0 comments on commit 2b3f54a

Please sign in to comment.