Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture AWS SQS info on message spans #1183

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions newrelic/api/message_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@


class MessageTrace(CatHeaderMixin, TimeTrace):

cat_id_key = "NewRelicID"
cat_transaction_key = "NewRelicTransaction"
cat_appdata_key = "NewRelicAppData"
Expand Down Expand Up @@ -92,7 +91,9 @@ def create_node(self):
)


def MessageTraceWrapper(wrapped, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None):
def MessageTraceWrapper(
wrapped, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None
):
def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
if not wrapper:
Expand Down Expand Up @@ -134,7 +135,16 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
else:
_destination_name = destination_name

trace = MessageTrace(_library, _operation, _destination_type, _destination_name, params={}, terminal=terminal, parent=parent, source=wrapped)
trace = MessageTrace(
_library,
_operation,
_destination_type,
_destination_name,
params={},
terminal=terminal,
parent=parent,
source=wrapped,
)

if wrapper: # pylint: disable=W0125,W0126
return wrapper(wrapped, trace)(*args, **kwargs)
Expand All @@ -158,7 +168,20 @@ def message_trace(library, operation, destination_type, destination_name, params
)


def wrap_message_trace(module, object_path, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None):
def wrap_message_trace(
module,
object_path,
library,
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
):
wrap_object(
module, object_path, MessageTraceWrapper, (library, operation, destination_type, destination_name, params, terminal, async_wrapper)
module,
object_path,
MessageTraceWrapper,
(library, operation, destination_type, destination_name, params, terminal, async_wrapper),
)
4 changes: 4 additions & 0 deletions newrelic/core/attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@
"response.headers.contentLength",
"response.headers.contentType",
"response.status",
"messaging.system",
"cloud.region",
"cloud.account.id",
"messaging.destination.name",
)
)

Expand Down
107 changes: 102 additions & 5 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import json
import logging
import re
import sys
import traceback
import uuid
Expand All @@ -24,17 +26,20 @@
from newrelic.api.datastore_trace import datastore_trace
from newrelic.api.external_trace import ExternalTrace
from newrelic.api.function_trace import FunctionTrace
from newrelic.api.message_trace import message_trace
from newrelic.api.time_trace import get_trace_linking_metadata
from newrelic.api.message_trace import MessageTrace, message_trace
from newrelic.api.time_trace import current_trace, get_trace_linking_metadata
from newrelic.api.transaction import current_transaction
from newrelic.common.async_wrapper import async_wrapper as get_async_wrapper
from newrelic.common.object_wrapper import (
FunctionWrapper,
ObjectProxy,
function_wrapper,
wrap_function_wrapper,
)
from newrelic.common.package_version_utils import get_package_version
from newrelic.core.config import global_settings

QUEUE_URL_PATTERN = re.compile(r"https://sqs.([\w\d-]+).amazonaws.com/(\d+)/([^/]+)")
BOTOCORE_VERSION = get_package_version("botocore")


Expand All @@ -54,6 +59,23 @@ def extract_sqs(*args, **kwargs):
return queue_value.rsplit("/", 1)[-1]


def extract_agent_attrs(*args, **kwargs):
# Try to capture AWS SQS info as agent attributes. Log any exception to debug.
agent_attrs = {}
try:
queue_url = kwargs.get("QueueUrl")
if queue_url:
m = QUEUE_URL_PATTERN.match(queue_url)
if m:
agent_attrs["messaging.system"] = "aws_sqs"
agent_attrs["cloud.region"] = m.group(1)
agent_attrs["cloud.account.id"] = m.group(2)
agent_attrs["messaging.destination.name"] = m.group(3)
except Exception as e:
_logger.debug("Failed to capture AWS SQS info.", exc_info=True)
return agent_attrs


def extract(argument_names, default=None):
def extractor_list(*args, **kwargs):
for argument_name in argument_names:
Expand Down Expand Up @@ -817,6 +839,75 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
)


def sqs_message_trace(
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
):
return functools.partial(
SQSMessageTraceWrapper,
operation=operation,
destination_type=destination_type,
destination_name=destination_name,
params=params,
terminal=terminal,
async_wrapper=async_wrapper,
extract_agent_attrs=extract_agent_attrs,
)


def SQSMessageTraceWrapper(
wrapped,
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
):
def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
if not wrapper:
parent = current_trace()
if not parent:
return wrapped(*args, **kwargs)
else:
parent = None

_library = "SQS"
_operation = operation
_destination_type = destination_type
_destination_name = destination_name(*args, **kwargs)

trace = MessageTrace(
_library,
_operation,
_destination_type,
_destination_name,
params={},
terminal=terminal,
parent=parent,
source=wrapped,
)

# Attach extracted agent attributes.
_agent_attrs = extract_agent_attrs(*args, **kwargs)
trace.agent_attributes.update(_agent_attrs)

if wrapper: # pylint: disable=W0125,W0126
return wrapper(wrapped, trace)(*args, **kwargs)

with trace:
return wrapped(*args, **kwargs)

return FunctionWrapper(wrapped, _nr_message_trace_wrapper_)


CUSTOM_TRACE_POINTS = {
("sns", "publish"): message_trace("SNS", "Produce", "Topic", extract(("TopicArn", "TargetArn"), "PhoneNumber")),
("dynamodb", "put_item"): datastore_trace("DynamoDB", extract("TableName"), "put_item"),
Expand All @@ -827,9 +918,15 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
("dynamodb", "delete_table"): datastore_trace("DynamoDB", extract("TableName"), "delete_table"),
("dynamodb", "query"): datastore_trace("DynamoDB", extract("TableName"), "query"),
("dynamodb", "scan"): datastore_trace("DynamoDB", extract("TableName"), "scan"),
("sqs", "send_message"): message_trace("SQS", "Produce", "Queue", extract_sqs),
("sqs", "send_message_batch"): message_trace("SQS", "Produce", "Queue", extract_sqs),
("sqs", "receive_message"): message_trace("SQS", "Consume", "Queue", extract_sqs),
("sqs", "send_message"): sqs_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
),
("sqs", "send_message_batch"): sqs_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
),
("sqs", "receive_message"): sqs_message_trace(
"Consume", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
),
("bedrock-runtime", "invoke_model"): wrap_bedrock_runtime_invoke_model(response_streaming=False),
("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model(
response_streaming=True
Expand Down
58 changes: 55 additions & 3 deletions tests/external_botocore/test_botocore_sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,51 @@
BOTOCORE_VERSION = get_package_version_tuple("botocore")

url = "sqs.us-east-1.amazonaws.com"
EXPECTED_SEND_MESSAGE_AGENT_ATTRS = {
"expected_agents": ["messaging.destination.name"],
"exact_agents": {
"aws.operation": "SendMessage",
"cloud.account.id": "123456789012",
"cloud.region": "us-east-1",
"messaging.system": "aws_sqs",
},
}
EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS = {
"expected_agents": ["messaging.destination.name"],
"exact_agents": {
"aws.operation": "ReceiveMessage",
"cloud.account.id": "123456789012",
"cloud.region": "us-east-1",
"messaging.system": "aws_sqs",
},
}
EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS = required = {
"expected_agents": ["messaging.destination.name"],
"exact_agents": {
"aws.operation": "SendMessageBatch",
"cloud.account.id": "123456789012",
"cloud.region": "us-east-1",
"messaging.system": "aws_sqs",
},
}
if BOTOCORE_VERSION < (1, 29, 0):
url = "queue.amazonaws.com"
# The old style url does not contain the necessary AWS info.
EXPECTED_SEND_MESSAGE_AGENT_ATTRS = {
"exact_agents": {
"aws.operation": "SendMessage",
},
}
EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS = {
"exact_agents": {
"aws.operation": "ReceiveMessage",
},
}
EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS = {
"exact_agents": {
"aws.operation": "SendMessageBatch",
},
}

AWS_ACCESS_KEY_ID = "AAAAAAAAAAAACCESSKEY"
AWS_SECRET_ACCESS_KEY = "AAAAAASECRETKEY" # nosec
Expand Down Expand Up @@ -65,9 +108,18 @@

@dt_enabled
@validate_span_events(exact_agents={"aws.operation": "CreateQueue"}, count=1)
@validate_span_events(exact_agents={"aws.operation": "SendMessage"}, count=1)
@validate_span_events(exact_agents={"aws.operation": "ReceiveMessage"}, count=1)
@validate_span_events(exact_agents={"aws.operation": "SendMessageBatch"}, count=1)
@validate_span_events(
**EXPECTED_SEND_MESSAGE_AGENT_ATTRS,
count=1,
)
@validate_span_events(
**EXPECTED_RECIEVE_MESSAGE_AGENT_ATTRS,
count=1,
)
@validate_span_events(
**EXPECTED_SEND_MESSAGE_BATCH_AGENT_ATTRS,
count=1,
)
@validate_span_events(exact_agents={"aws.operation": "PurgeQueue"}, count=1)
@validate_span_events(exact_agents={"aws.operation": "DeleteQueue"}, count=1)
@validate_transaction_metrics(
Expand Down
Loading