Skip to content

Commit

Permalink
Pika - add publish_hook and consume_hook
Browse files Browse the repository at this point in the history
  • Loading branch information
ItayGibel-helios committed Oct 21, 2021
1 parent 3049b4b commit b8e666b
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 17 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `opentelemetry-distro` uses the correct entrypoint name which was updated in the core release of 1.6.0 but the distro was not updated with it
([#755](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/755))

### Added
- `opentelemetry-instrumentation-pika` Add `publish_hook` and `consume_hook` callbacks passed as arguments to the instrument method
([#763](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/763))


## [1.6.1-0.25b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.1-0.25b1) - 2021-10-18

Expand Down
15 changes: 15 additions & 0 deletions instrumentation/opentelemetry-instrumentation-pika/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ Usage
PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider)
* PikaInstrumentor also supports instrumenting with hooks that will be called when producing or consuming a message.
The hooks should be of type `Callable[[Span, bytes, BasicProperties], None]`
where the first parameter is the span, the second parameter is the message body
and the third parameter is the message properties

.. code-block:: python
def publish_hook(span: Span, body: bytes, properties: BasicProperties):
span.set_attribute("messaging.payload", body.decode())
def consume_hook(span: Span, body: bytes, properties: BasicProperties):
span.set_attribute("messaging.id", properties.message_id)
PikaInstrumentor.instrument_channel(channel, publish_hook=publish_hook, consume_hook=consume_hook)
References
----------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@
PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider)
* PikaInstrumentor also supports instrumenting with hooks that will be called when producing or consuming a message.
The hooks should be of type `Callable[[Span, bytes, BasicProperties], None]`
where the first parameter is the span, the second parameter is the message body
and the third parameter is the message properties
.. code-block:: python
def publish_hook(span: Span, body: bytes, properties: BasicProperties):
span.set_attribute("messaging.payload", body.decode())
def consume_hook(span: Span, body: bytes, properties: BasicProperties):
span.set_attribute("messaging.id", properties.message_id)
PikaInstrumentor.instrument_channel(channel, publish_hook=publish_hook, consume_hook=consume_hook)
API
---
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,41 @@ class PikaInstrumentor(BaseInstrumentor): # type: ignore
# pylint: disable=attribute-defined-outside-init
@staticmethod
def _instrument_consumers(
consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer
consumers_dict: Dict[str, Callable[..., Any]],
tracer: Tracer,
consume_hook: utils.HookT = utils.dummy_callback,
) -> Any:
for key, callback in consumers_dict.items():
decorated_callback = utils._decorate_callback(
callback, tracer, key
callback, tracer, key, consume_hook
)
setattr(decorated_callback, "_original_callback", callback)
consumers_dict[key] = decorated_callback

@staticmethod
def _instrument_basic_publish(channel: Channel, tracer: Tracer) -> None:
def _instrument_basic_publish(
channel: Channel,
tracer: Tracer,
publish_hook: utils.HookT = utils.dummy_callback,
) -> None:
original_function = getattr(channel, "basic_publish")
decorated_function = utils._decorate_basic_publish(
original_function, channel, tracer
original_function, channel, tracer, publish_hook
)
setattr(decorated_function, "_original_function", original_function)
channel.__setattr__("basic_publish", decorated_function)
channel.basic_publish = decorated_function

@staticmethod
def _instrument_channel_functions(
channel: Channel, tracer: Tracer
channel: Channel,
tracer: Tracer,
publish_hook: utils.HookT = utils.dummy_callback,
) -> None:
if hasattr(channel, "basic_publish"):
PikaInstrumentor._instrument_basic_publish(channel, tracer)
PikaInstrumentor._instrument_basic_publish(
channel, tracer, publish_hook
)

@staticmethod
def _uninstrument_channel_functions(channel: Channel) -> None:
Expand All @@ -74,7 +84,10 @@ def _uninstrument_channel_functions(channel: Channel) -> None:

@staticmethod
def instrument_channel(
channel: Channel, tracer_provider: Optional[TracerProvider] = None,
channel: Channel,
tracer_provider: Optional[TracerProvider] = None,
publish_hook: utils.HookT = utils.dummy_callback,
consume_hook: utils.HookT = utils.dummy_callback,
) -> None:
if not hasattr(channel, "_is_instrumented_by_opentelemetry"):
channel._is_instrumented_by_opentelemetry = False
Expand All @@ -89,10 +102,12 @@ def instrument_channel(
return
if channel._impl._consumers:
PikaInstrumentor._instrument_consumers(
channel._impl._consumers, tracer
channel._impl._consumers, tracer, consume_hook
)
PikaInstrumentor._decorate_basic_consume(channel, tracer)
PikaInstrumentor._instrument_channel_functions(channel, tracer)
PikaInstrumentor._decorate_basic_consume(channel, tracer, consume_hook)
PikaInstrumentor._instrument_channel_functions(
channel, tracer, publish_hook
)

@staticmethod
def uninstrument_channel(channel: Channel) -> None:
Expand All @@ -113,17 +128,29 @@ def uninstrument_channel(channel: Channel) -> None:
PikaInstrumentor._uninstrument_channel_functions(channel)

def _decorate_channel_function(
self, tracer_provider: Optional[TracerProvider]
self,
tracer_provider: Optional[TracerProvider],
publish_hook: utils.HookT = utils.dummy_callback,
consume_hook: utils.HookT = utils.dummy_callback,
) -> None:
def wrapper(wrapped, instance, args, kwargs):
channel = wrapped(*args, **kwargs)
self.instrument_channel(channel, tracer_provider=tracer_provider)
self.instrument_channel(
channel,
tracer_provider=tracer_provider,
publish_hook=publish_hook,
consume_hook=consume_hook,
)
return channel

wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper)

@staticmethod
def _decorate_basic_consume(channel, tracer: Optional[Tracer]) -> None:
def _decorate_basic_consume(
channel,
tracer: Optional[Tracer],
consume_hook: utils.HookT = utils.dummy_callback,
) -> None:
def wrapper(wrapped, instance, args, kwargs):
if not hasattr(channel, "_impl"):
_LOG.error(
Expand All @@ -141,7 +168,7 @@ def wrapper(wrapped, instance, args, kwargs):
new_key = new_key_list[0]
callback = channel._impl._consumers[new_key]
decorated_callback = utils._decorate_callback(
callback, tracer, new_key
callback, tracer, new_key, consume_hook
)
setattr(decorated_callback, "_original_callback", callback)
channel._impl._consumers[new_key] = decorated_callback
Expand All @@ -151,8 +178,19 @@ def wrapper(wrapped, instance, args, kwargs):

def _instrument(self, **kwargs: Dict[str, Any]) -> None:
tracer_provider: TracerProvider = kwargs.get("tracer_provider", None)
publish_hook: utils.HookT = kwargs.get(
"publish_hook", utils.dummy_callback
)
consume_hook: utils.HookT = kwargs.get(
"consume_hook", utils.dummy_callback
)

self.__setattr__("__opentelemetry_tracer_provider", tracer_provider)
self._decorate_channel_function(tracer_provider)
self._decorate_channel_function(
tracer_provider,
publish_hook=publish_hook,
consume_hook=consume_hook,
)

def _uninstrument(self, **kwargs: Dict[str, Any]) -> None:
if hasattr(self, "__opentelemetry_tracer_provider"):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from logging import getLogger
from typing import Any, Callable, List, Optional

from pika.channel import Channel
Expand All @@ -13,6 +14,8 @@
from opentelemetry.trace import SpanKind, Tracer
from opentelemetry.trace.span import Span

_LOG = getLogger(__name__)


class _PikaGetter(Getter): # type: ignore
def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]:
Expand All @@ -27,11 +30,18 @@ def keys(self, carrier: CarrierT) -> List[str]:

_pika_getter = _PikaGetter()

HookT = Callable[[Span, bytes, BasicProperties], None]


def dummy_callback(span: Span, body: bytes, properties: BasicProperties):
...


def _decorate_callback(
callback: Callable[[Channel, Basic.Deliver, BasicProperties, bytes], Any],
tracer: Tracer,
task_name: str,
consume_hook: HookT = dummy_callback,
):
def decorated_callback(
channel: Channel,
Expand All @@ -56,6 +66,10 @@ def decorated_callback(
operation=MessagingOperationValues.RECEIVE,
)
with trace.use_span(span, end_on_exit=True):
try:
consume_hook(span, body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
retval = callback(channel, method, properties, body)
return retval

Expand All @@ -66,6 +80,7 @@ def _decorate_basic_publish(
original_function: Callable[[str, str, bytes, BasicProperties, bool], Any],
channel: Channel,
tracer: Tracer,
publish_hook: HookT = dummy_callback,
):
def decorated_function(
exchange: str,
Expand Down Expand Up @@ -95,6 +110,10 @@ def decorated_function(
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(properties.headers)
try:
publish_hook(span, body, properties)
except Exception as hook_exception: # pylint: disable=W0703
_LOG.exception(hook_exception)
retval = original_function(
exchange, routing_key, body, properties, mandatory
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from wrapt import BoundFunctionWrapper

from opentelemetry.instrumentation.pika import PikaInstrumentor
from opentelemetry.instrumentation.pika.utils import dummy_callback
from opentelemetry.trace import Tracer


Expand Down Expand Up @@ -71,7 +72,7 @@ def test_instrument_consumers(
) -> None:
tracer = mock.MagicMock(spec=Tracer)
expected_decoration_calls = [
mock.call(value, tracer, key)
mock.call(value, tracer, key, dummy_callback)
for key, value in self.channel._impl._consumers.items()
]
PikaInstrumentor._instrument_consumers(
Expand All @@ -95,7 +96,7 @@ def test_instrument_basic_publish(
original_function = self.channel.basic_publish
PikaInstrumentor._instrument_basic_publish(self.channel, tracer)
decorate_basic_publish.assert_called_once_with(
original_function, self.channel, tracer
original_function, self.channel, tracer, dummy_callback
)
self.assertEqual(
self.channel.basic_publish, decorate_basic_publish.return_value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,51 @@ def test_decorate_callback(
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.extract")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_callback_with_hook(
self,
use_span: mock.MagicMock,
extract: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
mock_task_name = "mock_task_name"
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
properties = mock.MagicMock()
mock_body = b"mock_body"
consume_hook = mock.MagicMock()

decorated_callback = utils._decorate_callback(
callback, tracer, mock_task_name, consume_hook
)
retval = decorated_callback(channel, method, properties, mock_body)
extract.assert_called_once_with(
properties.headers, getter=utils._pika_getter
)
get_span.assert_called_once_with(
tracer,
channel,
properties,
span_kind=SpanKind.CONSUMER,
task_name=mock_task_name,
ctx=extract.return_value,
operation=MessagingOperationValues.RECEIVE,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
consume_hook.assert_called_once_with(
get_span.return_value, mock_body, properties
)
callback.assert_called_once_with(
channel, method, properties, mock_body
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.context.get_current")
Expand Down Expand Up @@ -284,3 +329,51 @@ def test_decorate_basic_publish_no_properties(
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(basic_properties.return_value.headers)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.context.get_current")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_basic_publish_with_hook(
self,
use_span: mock.MagicMock,
get_current: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
mock_properties = mock.MagicMock()
mock_body = b"mock_body"
publish_hook = mock.MagicMock()

decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer, publish_hook
)
retval = decorated_basic_publish(
channel, method, mock_body, mock_properties
)
get_current.assert_called_once()
get_span.assert_called_once_with(
tracer,
channel,
mock_properties,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
ctx=get_current.return_value,
operation=None,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(mock_properties.headers)
callback.assert_called_once_with(
channel, method, mock_body, mock_properties, False
)
publish_hook.assert_called_once_with(
get_span.return_value, mock_body, mock_properties
)
self.assertEqual(retval, callback.return_value)

0 comments on commit b8e666b

Please sign in to comment.