Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Improve opentracing for incoming HTTP requests (#11618)
Browse files Browse the repository at this point in the history
* remove `start_active_span_from_request`

Instead, pull out a separate function, `span_context_from_request`, to extract
the parent span, which we can then pass into `start_active_span` as
normal. This seems to be clearer all round.

* Remove redundant tags from `incoming-federation-request`

These are all wrapped up inside a parent span generated in AsyncResource, so
there's no point duplicating all the tags that are set there.

* Leave request spans open until the request completes

It may take some time for the response to be encoded into JSON, and that JSON
to be streamed back to the client, and really we want that inside the top-level
span, so let's hand responsibility for closure to the SynapseRequest.

* opentracing logs for HTTP request events

* changelog
  • Loading branch information
richvdh authored Dec 20, 2021
1 parent 8e4083e commit 60fa493
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 73 deletions.
1 change: 1 addition & 0 deletions changelog.d/11618.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve opentracing support for incoming HTTP requests.
39 changes: 13 additions & 26 deletions synapse/federation/transport/server/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
from synapse.http.server import HttpServer, ServletCallback
from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
SynapseTags,
start_active_span,
start_active_span_from_request,
tags,
set_tag,
span_context_from_request,
start_active_span_follows_from,
whitelisted_homeserver,
)
from synapse.server import HomeServer
Expand Down Expand Up @@ -279,30 +277,19 @@ async def new_func(
logger.warning("authenticate_request failed: %s", e)
raise

request_tags = {
SynapseTags.REQUEST_ID: request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientIP(),
"authenticated_entity": origin,
"servlet_name": request.request_metrics.name,
}

# Only accept the span context if the origin is authenticated
# and whitelisted
# update the active opentracing span with the authenticated entity
set_tag("authenticated_entity", origin)

# if the origin is authenticated and whitelisted, link to its span context
context = None
if origin and whitelisted_homeserver(origin):
scope = start_active_span_from_request(
request, "incoming-federation-request", tags=request_tags
)
else:
scope = start_active_span(
"incoming-federation-request", tags=request_tags
)
context = span_context_from_request(request)

with scope:
opentracing.inject_response_headers(request.responseHeaders)
scope = start_active_span_follows_from(
"incoming-federation-request", contexts=(context,) if context else ()
)

with scope:
if origin and self.RATELIMIT:
with ratelimiter.ratelimit(origin) as d:
await d
Expand Down
30 changes: 29 additions & 1 deletion synapse/http/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import contextlib
import logging
import time
from typing import Any, Generator, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Generator, Optional, Tuple, Union

import attr
from zope.interface import implementer
Expand All @@ -35,6 +35,9 @@
)
from synapse.types import Requester

if TYPE_CHECKING:
import opentracing

logger = logging.getLogger(__name__)

_next_request_seq = 0
Expand Down Expand Up @@ -81,6 +84,10 @@ def __init__(
# server name, for client requests this is the Requester object.
self._requester: Optional[Union[Requester, str]] = None

# An opentracing span for this request. Will be closed when the request is
# completely processed.
self._opentracing_span: "Optional[opentracing.Span]" = None

# we can't yet create the logcontext, as we don't know the method.
self.logcontext: Optional[LoggingContext] = None

Expand Down Expand Up @@ -148,6 +155,13 @@ def requester(self, value: Union[Requester, str]) -> None:
# If there's no authenticated entity, it was the requester.
self.logcontext.request.authenticated_entity = authenticated_entity or requester

def set_opentracing_span(self, span: "opentracing.Span") -> None:
"""attach an opentracing span to this request
Doing so will cause the span to be closed when we finish processing the request
"""
self._opentracing_span = span

def get_request_id(self) -> str:
return "%s-%i" % (self.get_method(), self.request_seq)

Expand Down Expand Up @@ -286,6 +300,9 @@ async def handle_request(request):
self._processing_finished_time = time.time()
self._is_processing = False

if self._opentracing_span:
self._opentracing_span.log_kv({"event": "finished processing"})

# if we've already sent the response, log it now; otherwise, we wait for the
# response to be sent.
if self.finish_time is not None:
Expand All @@ -299,6 +316,8 @@ def finish(self) -> None:
"""
self.finish_time = time.time()
Request.finish(self)
if self._opentracing_span:
self._opentracing_span.log_kv({"event": "response sent"})
if not self._is_processing:
assert self.logcontext is not None
with PreserveLoggingContext(self.logcontext):
Expand Down Expand Up @@ -333,6 +352,11 @@ def connectionLost(self, reason: Union[Failure, Exception]) -> None:
with PreserveLoggingContext(self.logcontext):
logger.info("Connection from client lost before response was sent")

if self._opentracing_span:
self._opentracing_span.log_kv(
{"event": "client connection lost", "reason": str(reason.value)}
)

if not self._is_processing:
self._finished_processing()

Expand Down Expand Up @@ -421,6 +445,10 @@ def _finished_processing(self) -> None:
usage.evt_db_fetch_count,
)

# complete the opentracing span, if any.
if self._opentracing_span:
self._opentracing_span.finish()

try:
self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
except Exception as e:
Expand Down
68 changes: 22 additions & 46 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
import attr

from twisted.internet import defer
from twisted.web.http import Request
from twisted.web.http_headers import Headers

from synapse.config import ConfigError
Expand Down Expand Up @@ -490,48 +491,6 @@ def start_active_span_follows_from(
return scope


def start_active_span_from_request(
request,
operation_name,
references=None,
tags=None,
start_time=None,
ignore_active_span=False,
finish_on_close=True,
):
"""
Extracts a span context from a Twisted Request.
args:
headers (twisted.web.http.Request)
For the other args see opentracing.tracer
returns:
span_context (opentracing.span.SpanContext)
"""
# Twisted encodes the values as lists whereas opentracing doesn't.
# So, we take the first item in the list.
# Also, twisted uses byte arrays while opentracing expects strings.

if opentracing is None:
return noop_context_manager() # type: ignore[unreachable]

header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
}
context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)

return opentracing.tracer.start_active_span(
operation_name,
child_of=context,
references=references,
tags=tags,
start_time=start_time,
ignore_active_span=ignore_active_span,
finish_on_close=finish_on_close,
)


def start_active_span_from_edu(
edu_content,
operation_name,
Expand Down Expand Up @@ -743,6 +702,20 @@ def active_span_context_as_string():
return json_encoder.encode(carrier)


def span_context_from_request(request: Request) -> "Optional[opentracing.SpanContext]":
"""Extract an opentracing context from the headers on an HTTP request
This is useful when we have received an HTTP request from another part of our
system, and want to link our spans to those of the remote system.
"""
if not opentracing:
return None
header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
}
return opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)


@only_if_tracing
def span_context_from_string(carrier):
"""
Expand Down Expand Up @@ -882,10 +855,13 @@ def trace_servlet(request: "SynapseRequest", extract_context: bool = False):
}

request_name = request.request_metrics.name
if extract_context:
scope = start_active_span_from_request(request, request_name)
else:
scope = start_active_span(request_name)
context = span_context_from_request(request) if extract_context else None

# we configure the scope not to finish the span immediately on exit, and instead
# pass the span into the SynapseRequest, which will finish it once we've finished
# sending the response to the client.
scope = start_active_span(request_name, child_of=context, finish_on_close=False)
request.set_opentracing_span(scope.span)

with scope:
inject_response_headers(request.responseHeaders)
Expand Down

0 comments on commit 60fa493

Please sign in to comment.