diff --git a/newrelic/api/message_trace.py b/newrelic/api/message_trace.py index e0fa5956d..9cbe92361 100644 --- a/newrelic/api/message_trace.py +++ b/newrelic/api/message_trace.py @@ -22,7 +22,6 @@ class MessageTrace(CatHeaderMixin, TimeTrace): - cat_id_key = "NewRelicID" cat_transaction_key = "NewRelicTransaction" cat_appdata_key = "NewRelicAppData" @@ -92,7 +91,9 @@ def create_node(self): ) -def MessageTraceWrapper(wrapped, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None): +def MessageTraceWrapper( + wrapped, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None +): def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped) if not wrapper: @@ -134,7 +135,16 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): else: _destination_name = destination_name - trace = MessageTrace(_library, _operation, _destination_type, _destination_name, params={}, terminal=terminal, parent=parent, source=wrapped) + trace = MessageTrace( + _library, + _operation, + _destination_type, + _destination_name, + params={}, + terminal=terminal, + parent=parent, + source=wrapped, + ) if wrapper: # pylint: disable=W0125,W0126 return wrapper(wrapped, trace)(*args, **kwargs) @@ -158,7 +168,20 @@ def message_trace(library, operation, destination_type, destination_name, params ) -def wrap_message_trace(module, object_path, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None): +def wrap_message_trace( + module, + object_path, + library, + operation, + destination_type, + destination_name, + params={}, + terminal=True, + async_wrapper=None, +): wrap_object( - module, object_path, MessageTraceWrapper, (library, operation, destination_type, destination_name, params, terminal, async_wrapper) + module, + object_path, + MessageTraceWrapper, + (library, operation, destination_type, destination_name, params, terminal, async_wrapper), ) diff --git a/newrelic/core/attribute.py b/newrelic/core/attribute.py index e0f055f47..8cc6aff46 100644 --- a/newrelic/core/attribute.py +++ b/newrelic/core/attribute.py @@ -86,6 +86,10 @@ "response.headers.contentLength", "response.headers.contentType", "response.status", + "messaging.system", + "cloud.region", + "cloud.account.id", + "messaging.destination.name", ) ) diff --git a/newrelic/hooks/external_botocore.py b/newrelic/hooks/external_botocore.py index 138fd5d9a..4f0b7a6a7 100644 --- a/newrelic/hooks/external_botocore.py +++ b/newrelic/hooks/external_botocore.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools import json import logging +import re import sys import traceback import uuid @@ -24,10 +26,12 @@ from newrelic.api.datastore_trace import datastore_trace from newrelic.api.external_trace import ExternalTrace from newrelic.api.function_trace import FunctionTrace -from newrelic.api.message_trace import message_trace -from newrelic.api.time_trace import get_trace_linking_metadata +from newrelic.api.message_trace import MessageTrace, message_trace +from newrelic.api.time_trace import current_trace, get_trace_linking_metadata from newrelic.api.transaction import current_transaction +from newrelic.common.async_wrapper import async_wrapper as get_async_wrapper from newrelic.common.object_wrapper import ( + FunctionWrapper, ObjectProxy, function_wrapper, wrap_function_wrapper, @@ -35,6 +39,7 @@ from newrelic.common.package_version_utils import get_package_version from newrelic.core.config import global_settings +QUEUE_URL_PATTERN = re.compile(r"https://sqs.([\w\d-]+).amazonaws.com/(\d+)/([^/]+)") BOTOCORE_VERSION = get_package_version("botocore") @@ -54,6 +59,23 @@ def extract_sqs(*args, **kwargs): return queue_value.rsplit("/", 1)[-1] +def extract_agent_attrs(*args, **kwargs): + # Try to capture AWS SQS info as agent attributes. Log any exception to debug. + agent_attrs = {} + try: + queue_url = kwargs.get("QueueUrl") + if queue_url: + m = QUEUE_URL_PATTERN.match(queue_url) + if m: + agent_attrs["messaging.system"] = "aws_sqs" + agent_attrs["cloud.region"] = m.group(1) + agent_attrs["cloud.account.id"] = m.group(2) + agent_attrs["messaging.destination.name"] = m.group(3) + except Exception as e: + _logger.debug("Failed to capture AWS SQS info.", exc_info=True) + return agent_attrs + + def extract(argument_names, default=None): def extractor_list(*args, **kwargs): for argument_name in argument_names: @@ -817,6 +839,75 @@ def handle_chat_completion_event(transaction, bedrock_attrs): ) +def sqs_message_trace( + operation, + destination_type, + destination_name, + params={}, + terminal=True, + async_wrapper=None, + extract_agent_attrs=None, +): + return functools.partial( + SQSMessageTraceWrapper, + operation=operation, + destination_type=destination_type, + destination_name=destination_name, + params=params, + terminal=terminal, + async_wrapper=async_wrapper, + extract_agent_attrs=extract_agent_attrs, + ) + + +def SQSMessageTraceWrapper( + wrapped, + operation, + destination_type, + destination_name, + params={}, + terminal=True, + async_wrapper=None, + extract_agent_attrs=None, +): + def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs): + wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped) + if not wrapper: + parent = current_trace() + if not parent: + return wrapped(*args, **kwargs) + else: + parent = None + + _library = "SQS" + _operation = operation + _destination_type = destination_type + _destination_name = destination_name(*args, **kwargs) + + trace = MessageTrace( + _library, + _operation, + _destination_type, + _destination_name, + params={}, + terminal=terminal, + parent=parent, + source=wrapped, + ) + + # Attach extracted agent attributes. + _agent_attrs = extract_agent_attrs(*args, **kwargs) + trace.agent_attributes.update(_agent_attrs) + + if wrapper: # pylint: disable=W0125,W0126 + return wrapper(wrapped, trace)(*args, **kwargs) + + with trace: + return wrapped(*args, **kwargs) + + return FunctionWrapper(wrapped, _nr_message_trace_wrapper_) + + CUSTOM_TRACE_POINTS = { ("sns", "publish"): message_trace("SNS", "Produce", "Topic", extract(("TopicArn", "TargetArn"), "PhoneNumber")), ("dynamodb", "put_item"): datastore_trace("DynamoDB", extract("TableName"), "put_item"), @@ -827,9 +918,15 @@ def handle_chat_completion_event(transaction, bedrock_attrs): ("dynamodb", "delete_table"): datastore_trace("DynamoDB", extract("TableName"), "delete_table"), ("dynamodb", "query"): datastore_trace("DynamoDB", extract("TableName"), "query"), ("dynamodb", "scan"): datastore_trace("DynamoDB", extract("TableName"), "scan"), - ("sqs", "send_message"): message_trace("SQS", "Produce", "Queue", extract_sqs), - ("sqs", "send_message_batch"): message_trace("SQS", "Produce", "Queue", extract_sqs), - ("sqs", "receive_message"): message_trace("SQS", "Consume", "Queue", extract_sqs), + ("sqs", "send_message"): sqs_message_trace( + "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs + ), + ("sqs", "send_message_batch"): sqs_message_trace( + "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs + ), + ("sqs", "receive_message"): sqs_message_trace( + "Consume", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs + ), ("bedrock-runtime", "invoke_model"): wrap_bedrock_runtime_invoke_model(response_streaming=False), ("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model( response_streaming=True diff --git a/tests/external_botocore/test_botocore_sqs.py b/tests/external_botocore/test_botocore_sqs.py index 67744d133..f5ab9d6b6 100644 --- a/tests/external_botocore/test_botocore_sqs.py +++ b/tests/external_botocore/test_botocore_sqs.py @@ -30,8 +30,51 @@ BOTOCORE_VERSION = get_package_version_tuple("botocore") url = "sqs.us-east-1.amazonaws.com" +EXPECTED_SEND_MESSAGE_AGENT_ATTRS = { + "expected_agents": ["messaging.destination.name"], + "exact_agents": { + "aws.operation": "SendMessage", + "cloud.account.id": "123456789012", + "cloud.region": "us-east-1", + "messaging.system": "aws_sqs", + }, +} +EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS = { + "expected_agents": ["messaging.destination.name"], + "exact_agents": { + "aws.operation": "ReceiveMessage", + "cloud.account.id": "123456789012", + "cloud.region": "us-east-1", + "messaging.system": "aws_sqs", + }, +} +EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS = required = { + "expected_agents": ["messaging.destination.name"], + "exact_agents": { + "aws.operation": "SendMessageBatch", + "cloud.account.id": "123456789012", + "cloud.region": "us-east-1", + "messaging.system": "aws_sqs", + }, +} if BOTOCORE_VERSION < (1, 29, 0): url = "queue.amazonaws.com" + # The old style url does not contain the necessary AWS info. + EXPECTED_SEND_MESSAGE_AGENT_ATTRS = { + "exact_agents": { + "aws.operation": "SendMessage", + }, + } + EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS = { + "exact_agents": { + "aws.operation": "ReceiveMessage", + }, + } + EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS = { + "exact_agents": { + "aws.operation": "SendMessageBatch", + }, + } AWS_ACCESS_KEY_ID = "AAAAAAAAAAAACCESSKEY" AWS_SECRET_ACCESS_KEY = "AAAAAASECRETKEY" # nosec @@ -65,9 +108,18 @@ @dt_enabled @validate_span_events(exact_agents={"aws.operation": "CreateQueue"}, count=1) -@validate_span_events(exact_agents={"aws.operation": "SendMessage"}, count=1) -@validate_span_events(exact_agents={"aws.operation": "ReceiveMessage"}, count=1) -@validate_span_events(exact_agents={"aws.operation": "SendMessageBatch"}, count=1) +@validate_span_events( + **EXPECTED_SEND_MESSAGE_AGENT_ATTRS, + count=1, +) +@validate_span_events( + **EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS, + count=1, +) +@validate_span_events( + **EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS, + count=1, +) @validate_span_events(exact_agents={"aws.operation": "PurgeQueue"}, count=1) @validate_span_events(exact_agents={"aws.operation": "DeleteQueue"}, count=1) @validate_transaction_metrics(