Skip to content

Commit

Permalink
Allow RabbitMQ BlockingChannel.basic_consume() to link with outgoing …
Browse files Browse the repository at this point in the history
…spans (#224)

* improved ignore path regex

* update test

* fix sw_psycopg2 register_type()

* fix complexity level

* fix psycopg2 register_type() second arg default

* fix rabbitmq BlockingChannel consume cb span link

* add BlockingChannel.consume() instrumentation
  • Loading branch information
tom-pytel authored Jul 18, 2022
1 parent 9ad0c54 commit 56f4efc
Showing 1 changed file with 88 additions and 4 deletions.
92 changes: 88 additions & 4 deletions skywalking/plugins/sw_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@

def install():
from pika.channel import Channel
from pika.adapters.blocking_connection import BlockingChannel

_basic_publish = Channel.basic_publish
__on_deliver = Channel._on_deliver
Channel.basic_publish = _sw_basic_publish_func(_basic_publish)
Channel._on_deliver = _sw__on_deliver_func(__on_deliver)
Channel.basic_publish = _sw_basic_publish_func(Channel.basic_publish)
Channel._on_deliver = _sw__on_deliver_func(Channel._on_deliver)
BlockingChannel.basic_consume = _sw_blocking_basic_consume_func(BlockingChannel.basic_consume)
BlockingChannel.basic_get = _sw_blocking_basic_get_func(BlockingChannel.basic_get)
BlockingChannel.consume = _sw_blocking_consume_func(BlockingChannel.consume)


def _sw_basic_publish_func(_basic_publish):
Expand Down Expand Up @@ -73,8 +75,32 @@ def _sw_basic_publish(this, exchange,


def _sw__on_deliver_func(__on_deliver):
from pika.adapters.blocking_connection import BlockingChannel

def _sw__on_deliver(this, method_frame, header_frame, body):
peer = f'{this.connection.params.host}:{this.connection.params.port}'
consumer_tag = method_frame.method.consumer_tag

# The following is a special case for one type of channel to allow any exit spans to be linked properly to the
# incoming segment. Otherwise, if we create the span here the span ends before any oser callbacks are called and
# so any new spans will not be linked to the incoming message.

defer_span = False

try: # future-proofing if object structure changes
if consumer_tag not in this._cancelled and consumer_tag in this._consumers:
consumer = this._consumers[consumer_tag]

if isinstance(consumer.__self__, BlockingChannel):
method_frame.method._sw_peer = peer
defer_span = True

except Exception:
pass

if defer_span:
return __on_deliver(this, method_frame, header_frame, body)

context = get_context()
exchange = method_frame.method.exchange
routing_key = method_frame.method.routing_key
Expand All @@ -96,3 +122,61 @@ def _sw__on_deliver(this, method_frame, header_frame, body):
span.tag(TagMqQueue(routing_key))

return _sw__on_deliver


def _sw_callback_func(callback):
def _sw_callback(this, method, properties, body):
peer = method._sw_peer
context = get_context()
exchange = method.exchange
routing_key = method.routing_key
carrier = Carrier()
for item in carrier:
try:
if item.key in properties.headers:
item.val = properties.headers[item.key]
except TypeError:
pass

with context.new_entry_span(op='RabbitMQ/Topic/' + exchange + '/Queue/' + routing_key
+ '/Consumer' or '', carrier=carrier) as span:
span.layer = Layer.MQ
span.component = Component.RabbitmqConsumer
res = callback(this, method, properties, body)
span.tag(TagMqBroker(peer))
span.tag(TagMqTopic(exchange))
span.tag(TagMqQueue(routing_key))

return res

return _sw_callback


def _sw_blocking_basic_consume_func(func):
def _sw_basic_consume(this, queue, on_message_callback, *args, **kwargs):
return func(this, queue, _sw_callback_func(on_message_callback), *args, **kwargs)

return _sw_basic_consume


def _sw_blocking_basic_get_func(func):
def _sw_basic_get(this, queue, *args, **kwargs):
method, properties, body = func(this, queue, *args, **kwargs)

if method is not None:
_sw_callback_func(lambda *a: None)(this, method, properties, body)

return method, properties, body

return _sw_basic_get


def _sw_blocking_consume_func(func):
def _sw_consume(this, queue, *args, **kwargs):
for method, properties, body in func(this, queue, *args, **kwargs):
if method is not None:
_sw_callback_func(lambda *a: None)(this, method, properties, body)

yield method, properties, body

return _sw_consume

0 comments on commit 56f4efc

Please sign in to comment.