Skip to content

Commit

Permalink
Fix AttributeError when extracting tags from pika (#355)
Browse files Browse the repository at this point in the history
* Fix pika BlockingChannel AttributeError

* Remove duplicate span tag on pika consume

* Bump patch version

* Add test case for BlockingConnection

* Add test case for trace context

* Remove unused imports

* Refactor test cases for BlockingConnection

* Remove unused class attributes
  • Loading branch information
Ricardo Lopes authored Mar 22, 2022
1 parent 07e827f commit 14ab5ea
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 5 deletions.
3 changes: 1 addition & 2 deletions instana/instrumentation/pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def _extract_publisher_tags(span, conn, exchange, routing_key):
def _extract_consumer_tags(span, conn, queue):
_extract_broker_tags(span, conn)

span.set_tag("address", "%s:%d" % (conn.params.host, conn.params.port))
span.set_tag("sort", "consume")
span.set_tag("queue", queue)

Expand Down Expand Up @@ -119,7 +118,7 @@ def _cb_wrapper(channel, method, properties, body):
with tracer.start_active_span("rabbitmq", child_of=parent_span) as scope:
try:
_extract_consumer_tags(scope.span,
conn=instance.connection,
conn=instance.connection._impl,
queue=queue)
except:
logger.debug("basic_consume_with_instana: ", exc_info=True)
Expand Down
2 changes: 1 addition & 1 deletion instana/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

# Module version file. Used by setup.py and snapshot reporting.

VERSION = '1.37.0'
VERSION = '1.37.1'
98 changes: 96 additions & 2 deletions tests/clients/test_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@

from __future__ import absolute_import

import os
import pika
import unittest
import mock
import threading
import time

from ..helpers import testenv
from instana.singletons import tracer


Expand Down Expand Up @@ -375,3 +373,99 @@ def __consume():
# A new span has been started
self.assertIsNotNone(rabbitmq_span.s)
self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s)


class TestPikaBlockingChannelBlockingConnection(_TestPika):
@mock.patch('pika.adapters.blocking_connection.BlockingConnection', autospec=True)
def _create_connection(self, connection=None):
connection._impl = mock.create_autospec(pika.connection.Connection)
connection._impl.params = pika.connection.Parameters()
return connection

@mock.patch('pika.channel.Channel', spec=pika.channel.Channel)
def _create_obj(self, channel_impl):
self.impl = channel_impl()
self.impl.channel_number = 1

return pika.adapters.blocking_connection.BlockingChannel(self.impl, self.connection)

def _generate_delivery(self, method, properties, body):
from pika.adapters.blocking_connection import _ConsumerDeliveryEvt
evt = _ConsumerDeliveryEvt(method, properties, body)
self.obj._add_pending_event(evt)
self.obj._dispatch_events()

def test_basic_consume(self):
consumer_tag = "test.consumer"

self.impl.basic_consume.return_value = consumer_tag
self.impl._generate_consumer_tag.return_value = consumer_tag

cb = mock.Mock()

self.obj.basic_consume(queue="test.queue", on_message_callback=cb)

body = "Hello!"
properties = pika.BasicProperties()
method = pika.spec.Basic.Deliver(consumer_tag)
self._generate_delivery(method, properties, body)

spans = self.recorder.queued_spans()
self.assertEqual(1, len(spans))

rabbitmq_span = spans[0]

self.assertIsNone(tracer.active_span)

# A new span has been started
self.assertIsNotNone(rabbitmq_span.t)
self.assertIsNone(rabbitmq_span.p)
self.assertIsNotNone(rabbitmq_span.s)

# Error logging
self.assertIsNone(rabbitmq_span.ec)

# Span tags
self.assertIsNone(rabbitmq_span.data["rabbitmq"]["exchange"])
self.assertEqual("consume", rabbitmq_span.data["rabbitmq"]["sort"])
self.assertIsNotNone(rabbitmq_span.data["rabbitmq"]["address"])
self.assertEqual("test.queue", rabbitmq_span.data["rabbitmq"]["queue"])
self.assertIsNotNone(rabbitmq_span.stack)
self.assertTrue(type(rabbitmq_span.stack) is list)
self.assertGreater(len(rabbitmq_span.stack), 0)

cb.assert_called_once_with(self.obj, method, properties, body)

def test_basic_consume_with_trace_context(self):
consumer_tag = "test.consumer"

self.impl.basic_consume.return_value = consumer_tag
self.impl._generate_consumer_tag.return_value = consumer_tag

cb = mock.Mock()

self.obj.basic_consume(queue="test.queue", on_message_callback=cb)

body = "Hello!"
properties = pika.BasicProperties(headers={
"X-INSTANA-T": "0000000000000001",
"X-INSTANA-S": "0000000000000002",
"X-INSTANA-L": "1"
})
method = pika.spec.Basic.Deliver(consumer_tag)
self._generate_delivery(method, properties, body)

spans = self.recorder.queued_spans()
self.assertEqual(1, len(spans))

rabbitmq_span = spans[0]

self.assertIsNone(tracer.active_span)

# Trace context propagation
self.assertEqual("0000000000000001", rabbitmq_span.t)
self.assertEqual("0000000000000002", rabbitmq_span.p)

# A new span has been started
self.assertIsNotNone(rabbitmq_span.s)
self.assertNotEqual(rabbitmq_span.p, rabbitmq_span.s)

0 comments on commit 14ab5ea

Please sign in to comment.