From 8ba20fe43066dcb05d4c156937426e4d1ac3e1a8 Mon Sep 17 00:00:00 2001 From: Hannah Stepanek Date: Mon, 22 Jul 2024 17:04:33 -0700 Subject: [PATCH 1/2] Add AWS SQS agent attributes --- newrelic/api/message_trace.py | 66 ++++++++++++++++++-- newrelic/core/attribute.py | 4 ++ newrelic/hooks/external_botocore.py | 31 ++++++++- tests/external_botocore/test_botocore_sqs.py | 58 ++++++++++++++++- 4 files changed, 147 insertions(+), 12 deletions(-) diff --git a/newrelic/api/message_trace.py b/newrelic/api/message_trace.py index e0fa5956d0..27a1c55eed 100644 --- a/newrelic/api/message_trace.py +++ b/newrelic/api/message_trace.py @@ -22,7 +22,6 @@ class MessageTrace(CatHeaderMixin, TimeTrace): - cat_id_key = "NewRelicID" cat_transaction_key = "NewRelicTransaction" cat_appdata_key = "NewRelicAppData" @@ -92,7 +91,17 @@ def create_node(self): ) -def MessageTraceWrapper(wrapped, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None): +def MessageTraceWrapper( + wrapped, + library, + operation, + destination_type, + destination_name, + params={}, + terminal=True, + async_wrapper=None, + extract_agent_attrs=None, +): def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped) if not wrapper: @@ -134,7 +143,28 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): else: _destination_name = destination_name - trace = MessageTrace(_library, _operation, _destination_type, _destination_name, params={}, terminal=terminal, parent=parent, source=wrapped) + _agent_attrs = {} + if callable(extract_agent_attrs): + if instance is not None: + _agent_attrs = extract_agent_attrs(instance, *args, **kwargs) + else: + _agent_attrs = extract_agent_attrs(*args, **kwargs) + else: + _agent_attrs = extract_agent_attrs + + trace = MessageTrace( + _library, + _operation, + _destination_type, + _destination_name, + params={}, + terminal=terminal, + parent=parent, + source=wrapped, + ) + + # Attach extracted agent attributes. + trace.agent_attributes.update(_agent_attrs) if wrapper: # pylint: disable=W0125,W0126 return wrapper(wrapped, trace)(*args, **kwargs) @@ -145,7 +175,16 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): return FunctionWrapper(wrapped, _nr_message_trace_wrapper_) -def message_trace(library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None): +def message_trace( + library, + operation, + destination_type, + destination_name, + params={}, + terminal=True, + async_wrapper=None, + extract_agent_attrs=None, +): return functools.partial( MessageTraceWrapper, library=library, @@ -155,10 +194,25 @@ def message_trace(library, operation, destination_type, destination_name, params params=params, terminal=terminal, async_wrapper=async_wrapper, + extract_agent_attrs=extract_agent_attrs, ) -def wrap_message_trace(module, object_path, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None): +def wrap_message_trace( + module, + object_path, + library, + operation, + destination_type, + destination_name, + params={}, + terminal=True, + async_wrapper=None, + extract_agent_attrs=None, +): wrap_object( - module, object_path, MessageTraceWrapper, (library, operation, destination_type, destination_name, params, terminal, async_wrapper) + module, + object_path, + MessageTraceWrapper, + (library, operation, destination_type, destination_name, params, terminal, async_wrapper, extract_agent_attrs), ) diff --git a/newrelic/core/attribute.py b/newrelic/core/attribute.py index e0f055f47e..8cc6aff46d 100644 --- a/newrelic/core/attribute.py +++ b/newrelic/core/attribute.py @@ -86,6 +86,10 @@ "response.headers.contentLength", "response.headers.contentType", "response.status", + "messaging.system", + "cloud.region", + "cloud.account.id", + "messaging.destination.name", ) ) diff --git a/newrelic/hooks/external_botocore.py b/newrelic/hooks/external_botocore.py index 138fd5d9a2..5a44beac12 100644 --- a/newrelic/hooks/external_botocore.py +++ b/newrelic/hooks/external_botocore.py @@ -14,6 +14,7 @@ import json import logging +import re import sys import traceback import uuid @@ -35,6 +36,7 @@ from newrelic.common.package_version_utils import get_package_version from newrelic.core.config import global_settings +QUEUE_URL_PATTERN = re.compile(r"https://sqs.([\w\d-]+).amazonaws.com/(\d+)/([^/]+)") BOTOCORE_VERSION = get_package_version("botocore") @@ -54,6 +56,23 @@ def extract_sqs(*args, **kwargs): return queue_value.rsplit("/", 1)[-1] +def extract_agent_attrs(*args, **kwargs): + # Try to capture AWS SQS info as agent attributes. Log any exception to debug. + agent_attrs = {} + try: + queue_url = kwargs.get("QueueUrl") + if queue_url: + m = QUEUE_URL_PATTERN.match(queue_url) + if m: + agent_attrs["messaging.system"] = "aws_sqs" + agent_attrs["cloud.region"] = m.group(1) + agent_attrs["cloud.account.id"] = m.group(2) + agent_attrs["messaging.destination.name"] = m.group(3) + except Exception as e: + _logger.debug("Failed to capture AWS SQS info.", exc_info=True) + return agent_attrs + + def extract(argument_names, default=None): def extractor_list(*args, **kwargs): for argument_name in argument_names: @@ -827,9 +846,15 @@ def handle_chat_completion_event(transaction, bedrock_attrs): ("dynamodb", "delete_table"): datastore_trace("DynamoDB", extract("TableName"), "delete_table"), ("dynamodb", "query"): datastore_trace("DynamoDB", extract("TableName"), "query"), ("dynamodb", "scan"): datastore_trace("DynamoDB", extract("TableName"), "scan"), - ("sqs", "send_message"): message_trace("SQS", "Produce", "Queue", extract_sqs), - ("sqs", "send_message_batch"): message_trace("SQS", "Produce", "Queue", extract_sqs), - ("sqs", "receive_message"): message_trace("SQS", "Consume", "Queue", extract_sqs), + ("sqs", "send_message"): message_trace( + "SQS", "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs + ), + ("sqs", "send_message_batch"): message_trace( + "SQS", "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs + ), + ("sqs", "receive_message"): message_trace( + "SQS", "Consume", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs + ), ("bedrock-runtime", "invoke_model"): wrap_bedrock_runtime_invoke_model(response_streaming=False), ("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model( response_streaming=True diff --git a/tests/external_botocore/test_botocore_sqs.py b/tests/external_botocore/test_botocore_sqs.py index 67744d1339..f5ab9d6b64 100644 --- a/tests/external_botocore/test_botocore_sqs.py +++ b/tests/external_botocore/test_botocore_sqs.py @@ -30,8 +30,51 @@ BOTOCORE_VERSION = get_package_version_tuple("botocore") url = "sqs.us-east-1.amazonaws.com" +EXPECTED_SEND_MESSAGE_AGENT_ATTRS = { + "expected_agents": ["messaging.destination.name"], + "exact_agents": { + "aws.operation": "SendMessage", + "cloud.account.id": "123456789012", + "cloud.region": "us-east-1", + "messaging.system": "aws_sqs", + }, +} +EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS = { + "expected_agents": ["messaging.destination.name"], + "exact_agents": { + "aws.operation": "ReceiveMessage", + "cloud.account.id": "123456789012", + "cloud.region": "us-east-1", + "messaging.system": "aws_sqs", + }, +} +EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS = required = { + "expected_agents": ["messaging.destination.name"], + "exact_agents": { + "aws.operation": "SendMessageBatch", + "cloud.account.id": "123456789012", + "cloud.region": "us-east-1", + "messaging.system": "aws_sqs", + }, +} if BOTOCORE_VERSION < (1, 29, 0): url = "queue.amazonaws.com" + # The old style url does not contain the necessary AWS info. + EXPECTED_SEND_MESSAGE_AGENT_ATTRS = { + "exact_agents": { + "aws.operation": "SendMessage", + }, + } + EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS = { + "exact_agents": { + "aws.operation": "ReceiveMessage", + }, + } + EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS = { + "exact_agents": { + "aws.operation": "SendMessageBatch", + }, + } AWS_ACCESS_KEY_ID = "AAAAAAAAAAAACCESSKEY" AWS_SECRET_ACCESS_KEY = "AAAAAASECRETKEY" # nosec @@ -65,9 +108,18 @@ @dt_enabled @validate_span_events(exact_agents={"aws.operation": "CreateQueue"}, count=1) -@validate_span_events(exact_agents={"aws.operation": "SendMessage"}, count=1) -@validate_span_events(exact_agents={"aws.operation": "ReceiveMessage"}, count=1) -@validate_span_events(exact_agents={"aws.operation": "SendMessageBatch"}, count=1) +@validate_span_events( + **EXPECTED_SEND_MESSAGE_AGENT_ATTRS, + count=1, +) +@validate_span_events( + **EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS, + count=1, +) +@validate_span_events( + **EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS, + count=1, +) @validate_span_events(exact_agents={"aws.operation": "PurgeQueue"}, count=1) @validate_span_events(exact_agents={"aws.operation": "DeleteQueue"}, count=1) @validate_transaction_metrics( From 28a984d8886bdd758b3c96b6aa696b6039076fc8 Mon Sep 17 00:00:00 2001 From: Hannah Stepanek Date: Mon, 29 Jul 2024 10:30:18 -0700 Subject: [PATCH 2/2] Create AWS wrapper in botocore instrumentation --- newrelic/api/message_trace.py | 37 +----------- newrelic/hooks/external_botocore.py | 88 ++++++++++++++++++++++++++--- 2 files changed, 83 insertions(+), 42 deletions(-) diff --git a/newrelic/api/message_trace.py b/newrelic/api/message_trace.py index 27a1c55eed..9cbe923619 100644 --- a/newrelic/api/message_trace.py +++ b/newrelic/api/message_trace.py @@ -92,15 +92,7 @@ def create_node(self): def MessageTraceWrapper( - wrapped, - library, - operation, - destination_type, - destination_name, - params={}, - terminal=True, - async_wrapper=None, - extract_agent_attrs=None, + wrapped, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None ): def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped) @@ -143,15 +135,6 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): else: _destination_name = destination_name - _agent_attrs = {} - if callable(extract_agent_attrs): - if instance is not None: - _agent_attrs = extract_agent_attrs(instance, *args, **kwargs) - else: - _agent_attrs = extract_agent_attrs(*args, **kwargs) - else: - _agent_attrs = extract_agent_attrs - trace = MessageTrace( _library, _operation, @@ -163,9 +146,6 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): source=wrapped, ) - # Attach extracted agent attributes. - trace.agent_attributes.update(_agent_attrs) - if wrapper: # pylint: disable=W0125,W0126 return wrapper(wrapped, trace)(*args, **kwargs) @@ -175,16 +155,7 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): return FunctionWrapper(wrapped, _nr_message_trace_wrapper_) -def message_trace( - library, - operation, - destination_type, - destination_name, - params={}, - terminal=True, - async_wrapper=None, - extract_agent_attrs=None, -): +def message_trace(library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None): return functools.partial( MessageTraceWrapper, library=library, @@ -194,7 +165,6 @@ def message_trace( params=params, terminal=terminal, async_wrapper=async_wrapper, - extract_agent_attrs=extract_agent_attrs, ) @@ -208,11 +178,10 @@ def wrap_message_trace( params={}, terminal=True, async_wrapper=None, - extract_agent_attrs=None, ): wrap_object( module, object_path, MessageTraceWrapper, - (library, operation, destination_type, destination_name, params, terminal, async_wrapper, extract_agent_attrs), + (library, operation, destination_type, destination_name, params, terminal, async_wrapper), ) diff --git a/newrelic/hooks/external_botocore.py b/newrelic/hooks/external_botocore.py index 5a44beac12..4f0b7a6a71 100644 --- a/newrelic/hooks/external_botocore.py +++ b/newrelic/hooks/external_botocore.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools import json import logging import re @@ -25,10 +26,12 @@ from newrelic.api.datastore_trace import datastore_trace from newrelic.api.external_trace import ExternalTrace from newrelic.api.function_trace import FunctionTrace -from newrelic.api.message_trace import message_trace -from newrelic.api.time_trace import get_trace_linking_metadata +from newrelic.api.message_trace import MessageTrace, message_trace +from newrelic.api.time_trace import current_trace, get_trace_linking_metadata from newrelic.api.transaction import current_transaction +from newrelic.common.async_wrapper import async_wrapper as get_async_wrapper from newrelic.common.object_wrapper import ( + FunctionWrapper, ObjectProxy, function_wrapper, wrap_function_wrapper, @@ -836,6 +839,75 @@ def handle_chat_completion_event(transaction, bedrock_attrs): ) +def sqs_message_trace( + operation, + destination_type, + destination_name, + params={}, + terminal=True, + async_wrapper=None, + extract_agent_attrs=None, +): + return functools.partial( + SQSMessageTraceWrapper, + operation=operation, + destination_type=destination_type, + destination_name=destination_name, + params=params, + terminal=terminal, + async_wrapper=async_wrapper, + extract_agent_attrs=extract_agent_attrs, + ) + + +def SQSMessageTraceWrapper( + wrapped, + operation, + destination_type, + destination_name, + params={}, + terminal=True, + async_wrapper=None, + extract_agent_attrs=None, +): + def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): + wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped) + if not wrapper: + parent = current_trace() + if not parent: + return wrapped(*args, **kwargs) + else: + parent = None + + _library = "SQS" + _operation = operation + _destination_type = destination_type + _destination_name = destination_name(*args, **kwargs) + + trace = MessageTrace( + _library, + _operation, + _destination_type, + _destination_name, + params={}, + terminal=terminal, + parent=parent, + source=wrapped, + ) + + # Attach extracted agent attributes. + _agent_attrs = extract_agent_attrs(*args, **kwargs) + trace.agent_attributes.update(_agent_attrs) + + if wrapper: # pylint: disable=W0125,W0126 + return wrapper(wrapped, trace)(*args, **kwargs) + + with trace: + return wrapped(*args, **kwargs) + + return FunctionWrapper(wrapped, _nr_message_trace_wrapper_) + + CUSTOM_TRACE_POINTS = { ("sns", "publish"): message_trace("SNS", "Produce", "Topic", extract(("TopicArn", "TargetArn"), "PhoneNumber")), ("dynamodb", "put_item"): datastore_trace("DynamoDB", extract("TableName"), "put_item"), @@ -846,14 +918,14 @@ def handle_chat_completion_event(transaction, bedrock_attrs): ("dynamodb", "delete_table"): datastore_trace("DynamoDB", extract("TableName"), "delete_table"), ("dynamodb", "query"): datastore_trace("DynamoDB", extract("TableName"), "query"), ("dynamodb", "scan"): datastore_trace("DynamoDB", extract("TableName"), "scan"), - ("sqs", "send_message"): message_trace( - "SQS", "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs + ("sqs", "send_message"): sqs_message_trace( + "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs ), - ("sqs", "send_message_batch"): message_trace( - "SQS", "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs + ("sqs", "send_message_batch"): sqs_message_trace( + "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs ), - ("sqs", "receive_message"): message_trace( - "SQS", "Consume", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs + ("sqs", "receive_message"): sqs_message_trace( + "Consume", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs ), ("bedrock-runtime", "invoke_model"): wrap_bedrock_runtime_invoke_model(response_streaming=False), ("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model(