Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Various opentracing enhancements #11619

Merged
merged 8 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11619.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
A number of improvements to opentracing support.
53 changes: 37 additions & 16 deletions synapse/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from synapse.events import EventBase
from synapse.http import get_request_user_agent
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing as opentracing
from synapse.logging.opentracing import active_span, force_tracing, start_active_span
from synapse.storage.databases.main.registration import TokenLookupResult
from synapse.types import Requester, StateMap, UserID, create_requester
from synapse.util.caches.lrucache import LruCache
Expand Down Expand Up @@ -149,6 +149,42 @@ async def get_user_by_req(
is invalid.
AuthError if access is denied for the user in the access token
"""
parent_span = active_span()
with start_active_span("get_user_by_req"):
requester = await self._wrapped_get_user_by_req(
request, allow_guest, rights, allow_expired
)

if parent_span:
if requester.authenticated_entity in self._force_tracing_for_users:
# request tracing is enabled for this user, so we need to force it
# tracing on for the parent span (which will be the servlet span).
#
# It's too late for the get_user_by_req span to inherit the setting,
# so we also force it on for that.
force_tracing()
force_tracing(parent_span)
parent_span.set_tag(
"authenticated_entity", requester.authenticated_entity
)
Comment on lines +167 to +169
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to convince myself that authenticated_entity was the same here:

  • For appservices authenticated_entity isn't set during creation of the requester and we report the tag as user_id; this was essentially inlined logic from create_requester
  • For non-appservices, this was set to user_info.token_owner, which is the same thing that is passed as the authenticated_entity in create_requester.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, yes, I must have gone through that same thought process, but forgot to write it down here!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! Definitely better to put all this logic in one place though!

parent_span.set_tag("user_id", requester.user.to_string())
if requester.device_id is not None:
parent_span.set_tag("device_id", requester.device_id)
if requester.app_service is not None:
parent_span.set_tag("appservice_id", requester.app_service.id)
return requester

async def _wrapped_get_user_by_req(
self,
request: SynapseRequest,
allow_guest: bool,
rights: str,
allow_expired: bool,
) -> Requester:
"""Helper for get_user_by_req

Once get_user_by_req has set up the opentracing span, this does the actual work.
"""
try:
ip_addr = request.getClientIP()
user_agent = get_request_user_agent(request)
Expand Down Expand Up @@ -177,14 +213,6 @@ async def get_user_by_req(
)

request.requester = user_id
if user_id in self._force_tracing_for_users:
opentracing.force_tracing()
opentracing.set_tag("authenticated_entity", user_id)
opentracing.set_tag("user_id", user_id)
if device_id is not None:
opentracing.set_tag("device_id", device_id)
opentracing.set_tag("appservice_id", app_service.id)

return requester

user_info = await self.get_user_by_access_token(
Expand Down Expand Up @@ -242,13 +270,6 @@ async def get_user_by_req(
)

request.requester = requester
if user_info.token_owner in self._force_tracing_for_users:
opentracing.force_tracing()
opentracing.set_tag("authenticated_entity", user_info.token_owner)
opentracing.set_tag("user_id", user_info.user_id)
if device_id:
opentracing.set_tag("device_id", device_id)

return requester
except KeyError:
raise MissingClientTokenError()
Expand Down
7 changes: 4 additions & 3 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ async def current_sync_for_user(
span to track the sync. See `generate_sync_result` for the next part of your
indoctrination.
"""
with start_active_span("current_sync_for_user"):
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
Expand Down Expand Up @@ -1585,7 +1585,8 @@ async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
)
logger.debug("Generated room entry for %s", room_entry.room_id)

await concurrently_execute(handle_room_entries, room_entries, 10)
with start_active_span("sync.generate_room_entries"):
await concurrently_execute(handle_room_entries, room_entries, 10)

sync_result_builder.invited.extend(invited)
sync_result_builder.knocked.extend(knocked)
Expand Down Expand Up @@ -2045,7 +2046,7 @@ async def _generate_room_entry(
since_token = room_builder.since_token
upto_token = room_builder.upto_token

with start_active_span("generate_room_entry"):
with start_active_span("sync.generate_room_entry"):
set_tag("room_id", room_id)
log_kv({"events": len(events or ())})

Expand Down
19 changes: 17 additions & 2 deletions synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import trace_servlet
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
from synapse.util.iterutils import chunk_seq

if TYPE_CHECKING:
import opentracing

from synapse.server import HomeServer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -759,7 +761,20 @@ async def _async_write_json_to_request_in_thread(
expensive.
"""

json_str = await defer_to_thread(request.reactor, json_encoder, json_object)
def encode(opentracing_span: "Optional[opentracing.Span]") -> bytes:
# it might take a while for the threadpool to schedule us, so we write
# opentracing logs once we actually get scheduled, so that we can see how
# much that contributed.
if opentracing_span:
opentracing_span.log_kv({"event": "scheduled"})
res = json_encoder(json_object)
if opentracing_span:
opentracing_span.log_kv({"event": "encoded"})
return res

with start_active_span("encode_json_response"):
span = active_span()
Comment on lines +775 to +776
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We seem to be using this same logic in a bunch of places, it looks similar to the trace decorator, but not the same. It feels really duplicative, but maybe that's just open tracing?

(I guess in this case we need the span itself so we can call log_kv on it...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, it's less opentracing, and more our wrappers for opentracing (which make opentracing itself an optional dependency). If you have a real opentracing, you can do:

    with start_active_span("encode_json_response") as scope:
        span = scope.span
        span.log_kv(...)

... but with our wrappers, scope can be None, so you need the extra active_span() call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK! That makes a bit more sense!

json_str = await defer_to_thread(request.reactor, encode, span)

_write_bytes_to_request(request, json_str)

Expand Down
6 changes: 6 additions & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace
from synapse.types import JsonDict, StreamToken
from synapse.util import json_decoder

Expand Down Expand Up @@ -222,6 +223,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
logger.debug("Event formatting complete")
return 200, response_content

@trace(opname="sync.encode_response")
async def encode_response(
self,
time_now: int,
Expand Down Expand Up @@ -332,6 +334,7 @@ def encode_presence(events: List[UserPresenceState], time_now: int) -> JsonDict:
]
}

@trace(opname="sync.encode_joined")
async def encode_joined(
self,
rooms: List[JoinedSyncResult],
Expand Down Expand Up @@ -368,6 +371,7 @@ async def encode_joined(

return joined

@trace(opname="sync.encode_invited")
async def encode_invited(
self,
rooms: List[InvitedSyncResult],
Expand Down Expand Up @@ -406,6 +410,7 @@ async def encode_invited(

return invited

@trace(opname="sync.encode_knocked")
async def encode_knocked(
self,
rooms: List[KnockedSyncResult],
Expand Down Expand Up @@ -460,6 +465,7 @@ async def encode_knocked(

return knocked

@trace(opname="sync.encode_archived")
async def encode_archived(
self,
rooms: List[ArchivedSyncResult],
Expand Down