Skip to content

Commit

Permalink
aio-pika instrumentation: Removed check for non-sampled span when inj…
Browse files Browse the repository at this point in the history
…ect message header. (#1969)

* aio-pika instrumentation: Removed check for non-sampled span when inject message headers. Reason to change is that sampled flag can be propagate https://www.w3.org/TR/trace-context/#sampled-flag and be useful when trace is not sampled.

* black formting

---------

Co-authored-by: Shalev Roda <65566801+shalevr@users.noreply.github.com>
  • Loading branch information
nesb1 and shalevr authored Nov 27, 2023
1 parent 1b1c38d commit b29682b
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- `opentelemetry-instrumentation-aio-pika` and `opentelemetry-instrumentation-pika` Fix missing trace context propagation when trace not recording.
([#1969](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1969))
- Fix version of Flask dependency `werkzeug`
([#1980](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1980))
- `opentelemetry-resource-detector-azure` Using new Cloud Resource ID attribute.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ async def decorated_publish(
if not span:
return await publish(message, routing_key, **kwargs)
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(message.properties.headers)
propagate.inject(message.properties.headers)
return_value = await publish(message, routing_key, **kwargs)
return return_value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import asyncio
from typing import Type
from unittest import TestCase, mock, skipIf
from unittest.mock import MagicMock

from aio_pika import Exchange, RobustExchange

Expand Down Expand Up @@ -92,6 +93,36 @@ def test_publish(self):
def test_robust_publish(self):
self._test_publish(RobustExchange)

def _test_publish_works_with_not_recording_span(self, exchange_type):
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
mocked_not_recording_span = MagicMock()
mocked_not_recording_span.is_recording.return_value = False
mock_get_publish_span.return_value = mocked_not_recording_span
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
with mock.patch(
"opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject"
) as mock_inject:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()
mock_inject.assert_called_once()

def test_publish_works_with_not_recording_span(self):
self._test_publish_works_with_not_recording_span(Exchange)

def test_publish_works_with_not_recording_span_robust(self):
self._test_publish_works_with_not_recording_span(RobustExchange)


@skipIf(AIOPIKA_VERSION_INFO <= (8, 0), "Only for aio_pika 8")
class TestInstrumentedExchangeAioRmq8(TestCase):
Expand Down Expand Up @@ -144,3 +175,33 @@ def test_publish(self):

def test_robust_publish(self):
self._test_publish(RobustExchange)

def _test_publish_works_with_not_recording_span(self, exchange_type):
exchange = exchange_type(CONNECTION_7, CHANNEL_7, EXCHANGE_NAME)
with mock.patch.object(
PublishDecorator, "_get_publish_span"
) as mock_get_publish_span:
mocked_not_recording_span = MagicMock()
mocked_not_recording_span.is_recording.return_value = False
mock_get_publish_span.return_value = mocked_not_recording_span
with mock.patch.object(
Exchange, "publish", return_value=asyncio.sleep(0)
) as mock_publish:
with mock.patch(
"opentelemetry.instrumentation.aio_pika.publish_decorator.propagate.inject"
) as mock_inject:
decorated_publish = PublishDecorator(
self.tracer, exchange
).decorate(mock_publish)
self.loop.run_until_complete(
decorated_publish(MESSAGE, ROUTING_KEY)
)
mock_publish.assert_called_once()
mock_get_publish_span.assert_called_once()
mock_inject.assert_called_once()

def test_publish_works_with_not_recording_span(self):
self._test_publish_works_with_not_recording_span(Exchange)

def test_publish_works_with_not_recording_span_robust(self):
self._test_publish_works_with_not_recording_span(RobustExchange)
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,11 @@ def decorated_function(
exchange, routing_key, body, properties, mandatory
)
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(properties.headers)
try:
publish_hook(span, body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
propagate.inject(properties.headers)
try:
publish_hook(span, body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
retval = original_function(
exchange, routing_key, body, properties, mandatory
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ def test_decorate_basic_publish(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
callback.assert_called_once_with(
exchange_name, routing_key, mock_body, properties, False
Expand Down Expand Up @@ -323,7 +322,6 @@ def test_decorate_basic_publish_no_properties(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(basic_properties.return_value.headers)
self.assertEqual(retval, callback.return_value)

Expand Down Expand Up @@ -393,7 +391,55 @@ def test_decorate_basic_publish_with_hook(
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
publish_hook.assert_called_once_with(
get_span.return_value, mock_body, properties
)
callback.assert_called_once_with(
exchange_name, routing_key, mock_body, properties, False
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_basic_publish_when_span_is_not_recording(
self,
use_span: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
exchange_name = "test-exchange"
routing_key = "test-routing-key"
properties = mock.MagicMock()
mock_body = b"mock_body"
publish_hook = mock.MagicMock()

mocked_span = mock.MagicMock()
mocked_span.is_recording.return_value = False
get_span.return_value = mocked_span

decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer, publish_hook
)
retval = decorated_basic_publish(
exchange_name, routing_key, mock_body, properties
)
get_span.assert_called_once_with(
tracer,
channel,
properties,
destination=exchange_name,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
operation=None,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
inject.assert_called_once_with(properties.headers)
publish_hook.assert_called_once_with(
get_span.return_value, mock_body, properties
Expand Down

0 comments on commit b29682b

Please sign in to comment.