Skip to content

Commit

Permalink
Merge pull request #1183 from newrelic/capture-aws-sqs-info
Browse files Browse the repository at this point in the history
Capture AWS SQS info on message spans
  • Loading branch information
hmstepanek committed Aug 1, 2024
2 parents a4ca19d + 28a984d commit 60fe0b6
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 13 deletions.
33 changes: 28 additions & 5 deletions newrelic/api/message_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@


class MessageTrace(CatHeaderMixin, TimeTrace):

cat_id_key = "NewRelicID"
cat_transaction_key = "NewRelicTransaction"
cat_appdata_key = "NewRelicAppData"
Expand Down Expand Up @@ -92,7 +91,9 @@ def create_node(self):
)


def MessageTraceWrapper(wrapped, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None):
def MessageTraceWrapper(
wrapped, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None
):
def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
if not wrapper:
Expand Down Expand Up @@ -134,7 +135,16 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
else:
_destination_name = destination_name

trace = MessageTrace(_library, _operation, _destination_type, _destination_name, params={}, terminal=terminal, parent=parent, source=wrapped)
trace = MessageTrace(
_library,
_operation,
_destination_type,
_destination_name,
params={},
terminal=terminal,
parent=parent,
source=wrapped,
)

if wrapper: # pylint: disable=W0125,W0126
return wrapper(wrapped, trace)(*args, **kwargs)
Expand All @@ -158,7 +168,20 @@ def message_trace(library, operation, destination_type, destination_name, params
)


def wrap_message_trace(module, object_path, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None):
def wrap_message_trace(
module,
object_path,
library,
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
):
wrap_object(
module, object_path, MessageTraceWrapper, (library, operation, destination_type, destination_name, params, terminal, async_wrapper)
module,
object_path,
MessageTraceWrapper,
(library, operation, destination_type, destination_name, params, terminal, async_wrapper),
)
4 changes: 4 additions & 0 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@
"response.headers.contentLength",
"response.headers.contentType",
"response.status",
"messaging.system",
"cloud.region",
"cloud.account.id",
"messaging.destination.name",
)
)

Expand Down
107 changes: 102 additions & 5 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import json
import logging
import re
import sys
import traceback
import uuid
Expand All @@ -24,17 +26,20 @@
from newrelic.api.datastore_trace import datastore_trace
from newrelic.api.external_trace import ExternalTrace
from newrelic.api.function_trace import FunctionTrace
from newrelic.api.message_trace import message_trace
from newrelic.api.time_trace import get_trace_linking_metadata
from newrelic.api.message_trace import MessageTrace, message_trace
from newrelic.api.time_trace import current_trace, get_trace_linking_metadata
from newrelic.api.transaction import current_transaction
from newrelic.common.async_wrapper import async_wrapper as get_async_wrapper
from newrelic.common.object_wrapper import (
FunctionWrapper,
ObjectProxy,
function_wrapper,
wrap_function_wrapper,
)
from newrelic.common.package_version_utils import get_package_version
from newrelic.core.config import global_settings

QUEUE_URL_PATTERN = re.compile(r"https://sqs.([\w\d-]+).amazonaws.com/(\d+)/([^/]+)")
BOTOCORE_VERSION = get_package_version("botocore")


Expand All @@ -54,6 +59,23 @@ def extract_sqs(*args, **kwargs):
return queue_value.rsplit("/", 1)[-1]


def extract_agent_attrs(*args, **kwargs):
# Try to capture AWS SQS info as agent attributes. Log any exception to debug.
agent_attrs = {}
try:
queue_url = kwargs.get("QueueUrl")
if queue_url:
m = QUEUE_URL_PATTERN.match(queue_url)
if m:
agent_attrs["messaging.system"] = "aws_sqs"
agent_attrs["cloud.region"] = m.group(1)
agent_attrs["cloud.account.id"] = m.group(2)
agent_attrs["messaging.destination.name"] = m.group(3)
except Exception as e:
_logger.debug("Failed to capture AWS SQS info.", exc_info=True)
return agent_attrs


def extract(argument_names, default=None):
def extractor_list(*args, **kwargs):
for argument_name in argument_names:
Expand Down Expand Up @@ -817,6 +839,75 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
)


def sqs_message_trace(
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
):
return functools.partial(
SQSMessageTraceWrapper,
operation=operation,
destination_type=destination_type,
destination_name=destination_name,
params=params,
terminal=terminal,
async_wrapper=async_wrapper,
extract_agent_attrs=extract_agent_attrs,
)


def SQSMessageTraceWrapper(
wrapped,
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
):
def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
if not wrapper:
parent = current_trace()
if not parent:
return wrapped(*args, **kwargs)
else:
parent = None

_library = "SQS"
_operation = operation
_destination_type = destination_type
_destination_name = destination_name(*args, **kwargs)

trace = MessageTrace(
_library,
_operation,
_destination_type,
_destination_name,
params={},
terminal=terminal,
parent=parent,
source=wrapped,
)

# Attach extracted agent attributes.
_agent_attrs = extract_agent_attrs(*args, **kwargs)
trace.agent_attributes.update(_agent_attrs)

if wrapper: # pylint: disable=W0125,W0126
return wrapper(wrapped, trace)(*args, **kwargs)

with trace:
return wrapped(*args, **kwargs)

return FunctionWrapper(wrapped, _nr_message_trace_wrapper_)


CUSTOM_TRACE_POINTS = {
("sns", "publish"): message_trace("SNS", "Produce", "Topic", extract(("TopicArn", "TargetArn"), "PhoneNumber")),
("dynamodb", "put_item"): datastore_trace("DynamoDB", extract("TableName"), "put_item"),
Expand All @@ -827,9 +918,15 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
("dynamodb", "delete_table"): datastore_trace("DynamoDB", extract("TableName"), "delete_table"),
("dynamodb", "query"): datastore_trace("DynamoDB", extract("TableName"), "query"),
("dynamodb", "scan"): datastore_trace("DynamoDB", extract("TableName"), "scan"),
("sqs", "send_message"): message_trace("SQS", "Produce", "Queue", extract_sqs),
("sqs", "send_message_batch"): message_trace("SQS", "Produce", "Queue", extract_sqs),
("sqs", "receive_message"): message_trace("SQS", "Consume", "Queue", extract_sqs),
("sqs", "send_message"): sqs_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
),
("sqs", "send_message_batch"): sqs_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
),
("sqs", "receive_message"): sqs_message_trace(
"Consume", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
),
("bedrock-runtime", "invoke_model"): wrap_bedrock_runtime_invoke_model(response_streaming=False),
("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model(
response_streaming=True
Expand Down
58 changes: 55 additions & 3 deletions tests/external_botocore/test_botocore_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,51 @@
BOTOCORE_VERSION = get_package_version_tuple("botocore")

url = "sqs.us-east-1.amazonaws.com"
EXPECTED_SEND_MESSAGE_AGENT_ATTRS = {
"expected_agents": ["messaging.destination.name"],
"exact_agents": {
"aws.operation": "SendMessage",
"cloud.account.id": "123456789012",
"cloud.region": "us-east-1",
"messaging.system": "aws_sqs",
},
}
EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS = {
"expected_agents": ["messaging.destination.name"],
"exact_agents": {
"aws.operation": "ReceiveMessage",
"cloud.account.id": "123456789012",
"cloud.region": "us-east-1",
"messaging.system": "aws_sqs",
},
}
EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS = required = {
"expected_agents": ["messaging.destination.name"],
"exact_agents": {
"aws.operation": "SendMessageBatch",
"cloud.account.id": "123456789012",
"cloud.region": "us-east-1",
"messaging.system": "aws_sqs",
},
}
if BOTOCORE_VERSION < (1, 29, 0):
url = "queue.amazonaws.com"
# The old style url does not contain the necessary AWS info.
EXPECTED_SEND_MESSAGE_AGENT_ATTRS = {
"exact_agents": {
"aws.operation": "SendMessage",
},
}
EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS = {
"exact_agents": {
"aws.operation": "ReceiveMessage",
},
}
EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS = {
"exact_agents": {
"aws.operation": "SendMessageBatch",
},
}

AWS_ACCESS_KEY_ID = "AAAAAAAAAAAACCESSKEY"
AWS_SECRET_ACCESS_KEY = "AAAAAASECRETKEY" # nosec
Expand Down Expand Up @@ -65,9 +108,18 @@

@dt_enabled
@validate_span_events(exact_agents={"aws.operation": "CreateQueue"}, count=1)
@validate_span_events(exact_agents={"aws.operation": "SendMessage"}, count=1)
@validate_span_events(exact_agents={"aws.operation": "ReceiveMessage"}, count=1)
@validate_span_events(exact_agents={"aws.operation": "SendMessageBatch"}, count=1)
@validate_span_events(
**EXPECTED_SEND_MESSAGE_AGENT_ATTRS,
count=1,
)
@validate_span_events(
**EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS,
count=1,
)
@validate_span_events(
**EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS,
count=1,
)
@validate_span_events(exact_agents={"aws.operation": "PurgeQueue"}, count=1)
@validate_span_events(exact_agents={"aws.operation": "DeleteQueue"}, count=1)
@validate_transaction_metrics(
Expand Down

0 comments on commit 60fe0b6

Please sign in to comment.