Skip to content

Commit

Permalink
Add server.address attribute to RabbitMQ (#1186)
Browse files Browse the repository at this point in the history
* Add service.address to rabbitmq span attrs.

* Update pika tests to test span events and attrs

* Add instance info unit tests for pika

* Formatting

* Remove implicit string concatenation

* Fix more linter issues

* Trigger tests

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: Hannah Stepanek <hstepanek@newrelic.com>
  • Loading branch information
3 people authored Aug 8, 2024
1 parent 6c870ec commit 4e2dd16
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 33 deletions.
11 changes: 6 additions & 5 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
"aws.lambda.eventSource.arn",
"aws.operation",
"aws.requestId",
"cloud.account.id",
"cloud.region",
"code.filepath",
"code.function",
"code.lineno",
Expand All @@ -59,8 +61,8 @@
"enduser.id",
"error.class",
"error.expected",
"error.message",
"error.group.name",
"error.message",
"graphql.field.name",
"graphql.field.parentType",
"graphql.field.path",
Expand All @@ -74,6 +76,8 @@
"llm",
"message.queueName",
"message.routingKey",
"messaging.destination.name",
"messaging.system",
"peer.address",
"peer.hostname",
"request.headers.accept",
Expand All @@ -86,10 +90,7 @@
"response.headers.contentLength",
"response.headers.contentType",
"response.status",
"messaging.system",
"cloud.region",
"cloud.account.id",
"messaging.destination.name",
"server.address",
)
)

Expand Down
60 changes: 55 additions & 5 deletions newrelic/hooks/messagebroker_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,25 @@
wrap_object,
)

_START_KEY = "_nr_start_time"
KWARGS_ERROR = "Supportability/hooks/pika/kwargs_error"


def _add_consume_rabbitmq_trace(transaction, method, properties, nr_start_time, queue_name=None):
def instance_info(channel):
# Only host is currently used, so we only extract that.
try:
connection = channel.connection
if not hasattr(connection, "params") and hasattr(connection, "_impl"):
# Check for _impl attribute used by BlockingConnection to wrap actual connection objects
connection = connection._impl

host = connection.params.host
except Exception:
host = None

return host


def _add_consume_rabbitmq_trace(transaction, method, properties, nr_start_time, queue_name=None, channel=None):
routing_key = None
if hasattr(method, "routing_key"):
routing_key = method.routing_key
Expand Down Expand Up @@ -79,7 +93,16 @@ def _add_consume_rabbitmq_trace(transaction, method, properties, nr_start_time,
params=params,
)
trace.__enter__()

# Set start time and attributes after trace has started
trace.start_time = nr_start_time

# Extract host from channel to add as an agent attribute
host = instance_info(channel)
if trace and host:
trace._add_agent_attribute("server.address", host)

# Exit trace immediately and complete
trace.__exit__(None, None, None)


Expand Down Expand Up @@ -126,9 +149,15 @@ def _nr_wrapper_basic_publish(wrapped, instance, args, kwargs):
destination_name=exchange or "Default",
params=params,
source=wrapped,
):
) as trace:
cat_headers = MessageTrace.generate_request_headers(transaction)
properties.headers.update(cat_headers)

# Extract host from channel to add as an agent attribute
host = instance_info(instance)
if trace and host:
trace._add_agent_attribute("server.address", host)

return wrapped(*args, **kwargs)


Expand All @@ -144,9 +173,15 @@ def callback_wrapper(callback, _instance, _args, _kwargs):
if not _kwargs:
method, properties = _args[1:3]
start_time = getattr(callback_wrapper, "_nr_start_time", None)
channel = getattr(callback_wrapper, "_nr_channel", None)

_add_consume_rabbitmq_trace(
transaction, method=method, properties=properties, nr_start_time=start_time, queue_name=queue
transaction,
method=method,
properties=properties,
nr_start_time=start_time,
queue_name=queue,
channel=channel,
)
else:
m = transaction._transaction_metrics.get(KWARGS_ERROR, 0)
Expand All @@ -155,7 +190,12 @@ def callback_wrapper(callback, _instance, _args, _kwargs):
name = callable_name(callback)
return FunctionTraceWrapper(callback, name=name)(*_args, **_kwargs)

callback_wrapper._nr_start_time = time.time()
try:
callback_wrapper._nr_start_time = time.time()
callback_wrapper._nr_channel = instance
except Exception:
pass

queue, args, kwargs = wrap_get(callback_wrapper, *args, **kwargs)
return wrapped(*args, **kwargs)

Expand Down Expand Up @@ -266,6 +306,11 @@ def _possibly_create_traces(yielded):
)
bt.__enter__()

# Extract host from channel to add as an agent attribute
host = instance_info(instance)
if bt and host:
bt._add_agent_attribute("server.address", host)

return bt

def _generator(generator):
Expand Down Expand Up @@ -376,6 +421,11 @@ def callback_wrapper(wrapped, instance, args, kwargs):
m = mt._transaction_metrics.get(KWARGS_ERROR, 0)
mt._transaction_metrics[KWARGS_ERROR] = m + 1

# Extract host from channel to add as an agent attribute
host = instance_info(channel)
if mt and host:
mt._add_agent_attribute("server.address", host)

return wrapped(*args, **kwargs)

queue, args, kwargs = wrap_consume(callback_wrapper, *args, **kwargs)
Expand Down
18 changes: 13 additions & 5 deletions tests/messagebroker_pika/test_pika_async_connection_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
from testing_support.db_settings import rabbitmq_settings
from testing_support.fixtures import (
capture_transaction_metrics,
dt_enabled,
function_not_called,
override_application_settings,
)
from testing_support.validators.validate_code_level_metrics import (
validate_code_level_metrics,
)
from testing_support.validators.validate_span_events import validate_span_events
from testing_support.validators.validate_transaction_metrics import (
validate_transaction_metrics,
)
Expand Down Expand Up @@ -98,13 +100,19 @@ def handle_callback_exception(self, *args, **kwargs):

@parametrized_connection
@pytest.mark.parametrize("callback_as_partial", [True, False])
@dt_enabled
@validate_code_level_metrics(
"test_pika_async_connection_consume.test_async_connection_basic_get_inside_txn.<locals>",
"on_message",
py2_namespace="test_pika_async_connection_consume",
)
@validate_span_events(
count=1,
exact_intrinsics={"name": "MessageBroker/RabbitMQ/Exchange/Consume/Named/%s" % EXCHANGE},
exact_agents={"server.address": DB_SETTINGS["host"]},
)
@validate_transaction_metrics(
("test_pika_async_connection_consume:" "test_async_connection_basic_get_inside_txn"),
"test_pika_async_connection_consume:test_async_connection_basic_get_inside_txn",
scoped_metrics=_test_select_conn_basic_get_inside_txn_metrics,
rollup_metrics=_test_select_conn_basic_get_inside_txn_metrics,
background_task=True,
Expand Down Expand Up @@ -192,7 +200,7 @@ def on_open_connection(connection):
)
@parametrized_connection
@validate_transaction_metrics(
("test_pika_async_connection_consume:" "test_async_connection_basic_get_inside_txn_no_callback"),
"test_pika_async_connection_consume:test_async_connection_basic_get_inside_txn_no_callback",
scoped_metrics=_test_select_conn_basic_get_inside_txn_no_callback_metrics,
rollup_metrics=_test_select_conn_basic_get_inside_txn_no_callback_metrics,
background_task=True,
Expand Down Expand Up @@ -228,7 +236,7 @@ def on_open_connection(connection):
@parametrized_connection
@pytest.mark.parametrize("callback_as_partial", [True, False])
@validate_transaction_metrics(
("test_pika_async_connection_consume:" "test_async_connection_basic_get_empty"),
"test_pika_async_connection_consume:test_async_connection_basic_get_empty",
scoped_metrics=_test_async_connection_basic_get_empty_metrics,
rollup_metrics=_test_async_connection_basic_get_empty_metrics,
background_task=True,
Expand Down Expand Up @@ -285,7 +293,7 @@ def on_open_connection(connection):

@parametrized_connection
@validate_transaction_metrics(
("test_pika_async_connection_consume:" "test_async_connection_basic_consume_inside_txn"),
"test_pika_async_connection_consume:test_async_connection_basic_consume_inside_txn",
scoped_metrics=_test_select_conn_basic_consume_in_txn_metrics,
rollup_metrics=_test_select_conn_basic_consume_in_txn_metrics,
background_task=True,
Expand Down Expand Up @@ -361,7 +369,7 @@ def on_open_connection(connection):

@parametrized_connection
@validate_transaction_metrics(
("test_pika_async_connection_consume:" "test_async_connection_basic_consume_two_exchanges"),
"test_pika_async_connection_consume:test_async_connection_basic_consume_two_exchanges",
scoped_metrics=_test_select_conn_basic_consume_two_exchanges,
rollup_metrics=_test_select_conn_basic_consume_two_exchanges,
background_task=True,
Expand Down
28 changes: 21 additions & 7 deletions tests/messagebroker_pika/test_pika_blocking_connection_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
from compat import basic_consume
from conftest import BODY, CORRELATION_ID, EXCHANGE, HEADERS, QUEUE, REPLY_TO
from testing_support.db_settings import rabbitmq_settings
from testing_support.fixtures import capture_transaction_metrics
from testing_support.fixtures import capture_transaction_metrics, dt_enabled
from testing_support.validators.validate_code_level_metrics import (
validate_code_level_metrics,
)
from testing_support.validators.validate_span_events import validate_span_events
from testing_support.validators.validate_transaction_metrics import (
validate_transaction_metrics,
)
Expand All @@ -48,17 +49,23 @@
_test_blocking_connection_basic_get_metrics = [
("MessageBroker/RabbitMQ/Exchange/Produce/Named/%s" % EXCHANGE, None),
("MessageBroker/RabbitMQ/Exchange/Consume/Named/%s" % EXCHANGE, 1),
(("Function/pika.adapters.blocking_connection:" "_CallbackResult.set_value_once"), 1),
("Function/pika.adapters.blocking_connection:_CallbackResult.set_value_once", 1),
]


@dt_enabled
@validate_transaction_metrics(
("test_pika_blocking_connection_consume:" "test_blocking_connection_basic_get"),
"test_pika_blocking_connection_consume:test_blocking_connection_basic_get",
scoped_metrics=_test_blocking_connection_basic_get_metrics,
rollup_metrics=_test_blocking_connection_basic_get_metrics,
background_task=True,
)
@validate_tt_collector_json(message_broker_params=_message_broker_tt_params)
@validate_span_events(
count=1,
exact_intrinsics={"name": "MessageBroker/RabbitMQ/Exchange/Consume/Named/%s" % EXCHANGE},
exact_agents={"server.address": DB_SETTINGS["host"]},
)
@background_task()
def test_blocking_connection_basic_get(producer):
with pika.BlockingConnection(pika.ConnectionParameters(DB_SETTINGS["host"])) as connection:
Expand All @@ -75,7 +82,7 @@ def test_blocking_connection_basic_get(producer):


@validate_transaction_metrics(
("test_pika_blocking_connection_consume:" "test_blocking_connection_basic_get_empty"),
"test_pika_blocking_connection_consume:test_blocking_connection_basic_get_empty",
scoped_metrics=_test_blocking_connection_basic_get_empty_metrics,
rollup_metrics=_test_blocking_connection_basic_get_empty_metrics,
background_task=True,
Expand Down Expand Up @@ -137,13 +144,14 @@ def test_basic_get():
)
)
else:
_txn_name = "test_pika_blocking_connection_consume:" "on_message"
_txn_name = "test_pika_blocking_connection_consume:on_message"
_test_blocking_conn_basic_consume_no_txn_metrics.append(
("Function/test_pika_blocking_connection_consume:on_message", None)
)


@pytest.mark.parametrize("as_partial", [True, False])
@dt_enabled
@validate_code_level_metrics(
"test_pika_blocking_connection_consume.test_blocking_connection_basic_consume_outside_transaction.<locals>",
"on_message",
Expand All @@ -157,6 +165,11 @@ def test_basic_get():
group="Message/RabbitMQ/Exchange/%s" % EXCHANGE,
)
@validate_tt_collector_json(message_broker_params=_message_broker_tt_params)
@validate_span_events(
count=1,
exact_intrinsics={"name": "Message/RabbitMQ/Exchange/%s/%s" % (EXCHANGE, _txn_name)},
exact_agents={"server.address": DB_SETTINGS["host"]},
)
def test_blocking_connection_basic_consume_outside_transaction(producer, as_partial):
def on_message(channel, method_frame, header_frame, body):
assert hasattr(method_frame, "_nr_start_time")
Expand Down Expand Up @@ -200,13 +213,14 @@ def on_message(channel, method_frame, header_frame, body):


@pytest.mark.parametrize("as_partial", [True, False])
@dt_enabled
@validate_code_level_metrics(
"test_pika_blocking_connection_consume.test_blocking_connection_basic_consume_inside_txn.<locals>",
"on_message",
py2_namespace="test_pika_blocking_connection_consume",
)
@validate_transaction_metrics(
("test_pika_blocking_connection_consume:" "test_blocking_connection_basic_consume_inside_txn"),
"test_pika_blocking_connection_consume:test_blocking_connection_basic_consume_inside_txn",
scoped_metrics=_test_blocking_conn_basic_consume_in_txn_metrics,
rollup_metrics=_test_blocking_conn_basic_consume_in_txn_metrics,
background_task=True,
Expand Down Expand Up @@ -257,7 +271,7 @@ def on_message(channel, method_frame, header_frame, body):

@pytest.mark.parametrize("as_partial", [True, False])
@validate_transaction_metrics(
("test_pika_blocking_connection_consume:" "test_blocking_connection_basic_consume_stopped_txn"),
"test_pika_blocking_connection_consume:test_blocking_connection_basic_consume_stopped_txn",
scoped_metrics=_test_blocking_conn_basic_consume_stopped_txn_metrics,
rollup_metrics=_test_blocking_conn_basic_consume_stopped_txn_metrics,
background_task=True,
Expand Down
Loading

0 comments on commit 4e2dd16

Please sign in to comment.