Skip to content

Commit

Permalink
Make SpanProcessor.on_start accept parent Context (#1251)
Browse files Browse the repository at this point in the history
* context from Tracer.start_span is passed through to the SpanProcessor
* fix linting issue in falcon test app when linting with eachdist script
* fix global error handler test as it read installed extensions
* reset global Configuration object state after tests were run
  • Loading branch information
mariojonke authored Oct 16, 2020
1 parent 5fc08ad commit 2d873e5
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 23 deletions.
3 changes: 3 additions & 0 deletions exporter/opentelemetry-exporter-datadog/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Make `SpanProcessor.on_start` accept parent Context
([#1251](https://github.com/open-telemetry/opentelemetry-python/pull/1251))

## Version 0.14b0

Released 2020-10-13
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import threading
import typing

from opentelemetry.context import attach, detach, set_value
from opentelemetry.context import Context, attach, detach, set_value
from opentelemetry.sdk.trace import Span, SpanProcessor
from opentelemetry.sdk.trace.export import SpanExporter
from opentelemetry.trace import INVALID_TRACE_ID
Expand Down Expand Up @@ -81,7 +81,9 @@ def __init__(
self.done = False
self.worker_thread.start()

def on_start(self, span: Span) -> None:
def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
) -> None:
ctx = span.get_span_context()
trace_id = ctx.trace_id

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ddtrace.internal.writer import AgentWriter

from opentelemetry import trace as trace_api
from opentelemetry.context import Context
from opentelemetry.exporter import datadog
from opentelemetry.sdk import trace
from opentelemetry.sdk.trace import Resource, sampling
Expand Down Expand Up @@ -482,6 +483,21 @@ def test_span_processor_scheduled_delay(self):

tracer_provider.shutdown()

def test_span_processor_accepts_parent_context(self):
span_processor = mock.Mock(
wraps=datadog.DatadogExportSpanProcessor(self.exporter)
)
tracer_provider = trace.TracerProvider()
tracer_provider.add_span_processor(span_processor)
tracer = tracer_provider.get_tracer(__name__)

context = Context()
span = tracer.start_span("foo", context=context)

span_processor.on_start.assert_called_once_with(
span, parent_context=context
)

def test_origin(self):
context = trace_api.SpanContext(
trace_id=0x000000000000000000000000DEADBEEF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

class HelloWorldResource:
def _handle_request(self, _, resp):
# pylint: disable=no-member
resp.status = falcon.HTTP_201
resp.body = "Hello World"

Expand Down
3 changes: 3 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Make `SpanProcessor.on_start` accept parent Context
([#1251](https://github.com/open-telemetry/opentelemetry-python/pull/1251))

## Version 0.14b0

Released 2020-10-13
Expand Down
48 changes: 35 additions & 13 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,19 @@ class SpanProcessor:
in the same order as they were registered.
"""

def on_start(self, span: "Span") -> None:
def on_start(
self,
span: "Span",
parent_context: Optional[context_api.Context] = None,
) -> None:
"""Called when a :class:`opentelemetry.trace.Span` is started.
This method is called synchronously on the thread that starts the
span, therefore it should not block or throw an exception.
Args:
span: The :class:`opentelemetry.trace.Span` that just started.
parent_context: The parent context of the span that just started.
"""

def on_end(self, span: "Span") -> None:
Expand Down Expand Up @@ -124,9 +129,13 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
with self._lock:
self._span_processors = self._span_processors + (span_processor,)

def on_start(self, span: "Span") -> None:
def on_start(
self,
span: "Span",
parent_context: Optional[context_api.Context] = None,
) -> None:
for sp in self._span_processors:
sp.on_start(span)
sp.on_start(span, parent_context=parent_context)

def on_end(self, span: "Span") -> None:
for sp in self._span_processors:
Expand Down Expand Up @@ -192,17 +201,26 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
self._span_processors = self._span_processors + (span_processor,)

def _submit_and_await(
self, func: Callable[[SpanProcessor], Callable[..., None]], *args: Any
self,
func: Callable[[SpanProcessor], Callable[..., None]],
*args: Any,
**kwargs: Any
):
futures = []
for sp in self._span_processors:
future = self._executor.submit(func(sp), *args)
future = self._executor.submit(func(sp), *args, **kwargs)
futures.append(future)
for future in futures:
future.result()

def on_start(self, span: "Span") -> None:
self._submit_and_await(lambda sp: sp.on_start, span)
def on_start(
self,
span: "Span",
parent_context: Optional[context_api.Context] = None,
) -> None:
self._submit_and_await(
lambda sp: sp.on_start, span, parent_context=parent_context
)

def on_end(self, span: "Span") -> None:
self._submit_and_await(lambda sp: sp.on_end, span)
Expand Down Expand Up @@ -584,7 +602,11 @@ def add_event(
)
)

def start(self, start_time: Optional[int] = None) -> None:
def start(
self,
start_time: Optional[int] = None,
parent_context: Optional[context_api.Context] = None,
) -> None:
with self._lock:
if not self.is_recording():
return
Expand All @@ -596,7 +618,7 @@ def start(self, start_time: Optional[int] = None) -> None:
if has_started:
logger.warning("Calling start() on a started span.")
return
self.span_processor.on_start(self)
self.span_processor.on_start(self, parent_context=parent_context)

def end(self, end_time: Optional[int] = None) -> None:
with self._lock:
Expand Down Expand Up @@ -764,7 +786,7 @@ def start_span( # pylint: disable=too-many-locals
if sampling_result.decision.is_sampled()
else trace_api.TraceFlags(trace_api.TraceFlags.DEFAULT)
)
context = trace_api.SpanContext(
span_context = trace_api.SpanContext(
trace_id,
self.source.ids_generator.generate_span_id(),
is_remote=False,
Expand All @@ -777,7 +799,7 @@ def start_span( # pylint: disable=too-many-locals
# pylint:disable=protected-access
span = _Span(
name=name,
context=context,
context=span_context,
parent=parent_span_context,
sampler=self.source.sampler,
resource=self.source.resource,
Expand All @@ -788,9 +810,9 @@ def start_span( # pylint: disable=too-many-locals
instrumentation_info=self.instrumentation_info,
set_status_on_exception=set_status_on_exception,
)
span.start(start_time=start_time)
span.start(start_time=start_time, parent_context=context)
else:
span = trace_api.DefaultSpan(context=context)
span = trace_api.DefaultSpan(context=span_context)
return span

@contextmanager
Expand Down
10 changes: 7 additions & 3 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from enum import Enum

from opentelemetry.configuration import Configuration
from opentelemetry.context import attach, detach, set_value
from opentelemetry.context import Context, attach, detach, set_value
from opentelemetry.sdk.trace import Span, SpanProcessor
from opentelemetry.util import time_ns

Expand Down Expand Up @@ -70,7 +70,9 @@ class SimpleExportSpanProcessor(SpanProcessor):
def __init__(self, span_exporter: SpanExporter):
self.span_exporter = span_exporter

def on_start(self, span: Span) -> None:
def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
) -> None:
pass

def on_end(self, span: Span) -> None:
Expand Down Expand Up @@ -172,7 +174,9 @@ def __init__(
] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
self.worker_thread.start()

def on_start(self, span: Span) -> None:
def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
) -> None:
pass

def on_end(self, span: Span) -> None:
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/tests/error_handler/test_error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@


class TestErrorHandler(TestCase):
def test_default_error_handler(self):
@patch("opentelemetry.sdk.error_handler.iter_entry_points")
def test_default_error_handler(self, mock_iter_entry_points):

with self.assertLogs(logger, ERROR):
with GlobalErrorHandler():
Expand Down
41 changes: 41 additions & 0 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from unittest import mock

from opentelemetry import trace as trace_api
from opentelemetry.configuration import Configuration
from opentelemetry.context import Context
from opentelemetry.sdk import trace
from opentelemetry.sdk.trace import export

Expand Down Expand Up @@ -100,6 +102,23 @@ def test_simple_span_processor_no_context(self):

self.assertListEqual(["xxx", "bar", "foo"], spans_names_list)

def test_on_start_accepts_context(self):
# pylint: disable=no-self-use
tracer_provider = trace.TracerProvider()
tracer = tracer_provider.get_tracer(__name__)

exporter = MySpanExporter([])
span_processor = mock.Mock(
wraps=export.SimpleExportSpanProcessor(exporter)
)
tracer_provider.add_span_processor(span_processor)

context = Context()
span = tracer.start_span("foo", context=context)
span_processor.on_start.assert_called_once_with(
span, parent_context=context
)

def test_simple_span_processor_not_sampled(self):
tracer_provider = trace.TracerProvider(
sampler=trace.sampling.ALWAYS_OFF
Expand Down Expand Up @@ -136,6 +155,11 @@ def _create_start_and_end_span(name, span_processor):


class TestBatchExportSpanProcessor(unittest.TestCase):
def tearDown(self) -> None:
# reset global state of configuration object
# pylint: disable=protected-access
Configuration._reset()

@mock.patch.dict(
"os.environ",
{
Expand All @@ -156,6 +180,23 @@ def test_batch_span_processor_environment_variables(self):
self.assertEqual(batch_span_processor.max_export_batch_size, 3)
self.assertEqual(batch_span_processor.export_timeout_millis, 4)

def test_on_start_accepts_parent_context(self):
# pylint: disable=no-self-use
my_exporter = MySpanExporter(destination=[])
span_processor = mock.Mock(
wraps=export.BatchExportSpanProcessor(my_exporter)
)
tracer_provider = trace.TracerProvider()
tracer_provider.add_span_processor(span_processor)
tracer = tracer_provider.get_tracer(__name__)

context = Context()
span = tracer.start_span("foo", context=context)

span_processor.on_start.assert_called_once_with(
span, parent_context=context
)

def test_shutdown(self):
spans_names_list = []

Expand Down
13 changes: 10 additions & 3 deletions opentelemetry-sdk/tests/trace/test_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import typing
import unittest
from threading import Event
from typing import Optional
from unittest import mock

from opentelemetry import trace as trace_api
from opentelemetry.context import Context
from opentelemetry.sdk import trace


Expand All @@ -36,7 +38,9 @@ def __init__(self, name, span_list):
self.name = name
self.span_list = span_list

def on_start(self, span: "trace.Span") -> None:
def on_start(
self, span: "trace.Span", parent_context: Optional[Context] = None
) -> None:
self.span_list.append(span_event_start_fmt(self.name, span.name))

def on_end(self, span: "trace.Span") -> None:
Expand Down Expand Up @@ -160,10 +164,13 @@ def test_on_start(self):
multi_processor.add_span_processor(mock_processor)

span = self.create_default_span()
multi_processor.on_start(span)
context = Context()
multi_processor.on_start(span, parent_context=context)

for mock_processor in mocks:
mock_processor.on_start.assert_called_once_with(span)
mock_processor.on_start.assert_called_once_with(
span, parent_context=context
)
multi_processor.shutdown()

def test_on_end(self):
Expand Down
22 changes: 21 additions & 1 deletion opentelemetry-sdk/tests/trace/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import subprocess
import unittest
from logging import ERROR, WARNING
from typing import Optional
from unittest import mock

from opentelemetry import trace as trace_api
from opentelemetry.context import Context
from opentelemetry.sdk import resources, trace
from opentelemetry.sdk.trace import Resource, sampling
from opentelemetry.sdk.util import ns_to_iso_str
Expand Down Expand Up @@ -435,6 +437,8 @@ def test_disallow_direct_span_creation(self):


class TestSpan(unittest.TestCase):
# pylint: disable=too-many-public-methods

def setUp(self):
self.tracer = new_tracer()

Expand Down Expand Up @@ -734,6 +738,20 @@ def test_start_span(self):
)
self.assertIs(span.status.description, "Test description")

def test_start_accepts_context(self):
# pylint: disable=no-self-use
span_processor = mock.Mock(spec=trace.SpanProcessor)
span = trace._Span(
"name",
mock.Mock(spec=trace_api.SpanContext),
span_processor=span_processor,
)
context = Context()
span.start(parent_context=context)
span_processor.on_start.assert_called_once_with(
span, parent_context=context
)

def test_span_override_start_and_end_time(self):
"""Span sending custom start_time and end_time values"""
span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext))
Expand Down Expand Up @@ -899,7 +917,9 @@ def __init__(self, name, span_list):
self.name = name
self.span_list = span_list

def on_start(self, span: "trace.Span") -> None:
def on_start(
self, span: "trace.Span", parent_context: Optional[Context] = None
) -> None:
self.span_list.append(span_event_start_fmt(self.name, span.name))

def on_end(self, span: "trace.Span") -> None:
Expand Down

0 comments on commit 2d873e5

Please sign in to comment.