Skip to content

Commit

Permalink
Create AWS wrapper in botocore instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
hmstepanek committed Jul 29, 2024
1 parent 8ba20fe commit 28a984d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 42 deletions.
37 changes: 3 additions & 34 deletions newrelic/api/message_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,7 @@ def create_node(self):


def MessageTraceWrapper(
wrapped,
library,
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
wrapped, library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None
):
def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
Expand Down Expand Up @@ -143,15 +135,6 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
else:
_destination_name = destination_name

_agent_attrs = {}
if callable(extract_agent_attrs):
if instance is not None:
_agent_attrs = extract_agent_attrs(instance, *args, **kwargs)
else:
_agent_attrs = extract_agent_attrs(*args, **kwargs)
else:
_agent_attrs = extract_agent_attrs

trace = MessageTrace(
_library,
_operation,
Expand All @@ -163,9 +146,6 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
source=wrapped,
)

# Attach extracted agent attributes.
trace.agent_attributes.update(_agent_attrs)

if wrapper: # pylint: disable=W0125,W0126
return wrapper(wrapped, trace)(*args, **kwargs)

Expand All @@ -175,16 +155,7 @@ def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
return FunctionWrapper(wrapped, _nr_message_trace_wrapper_)


def message_trace(
library,
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
):
def message_trace(library, operation, destination_type, destination_name, params={}, terminal=True, async_wrapper=None):
return functools.partial(
MessageTraceWrapper,
library=library,
Expand All @@ -194,7 +165,6 @@ def message_trace(
params=params,
terminal=terminal,
async_wrapper=async_wrapper,
extract_agent_attrs=extract_agent_attrs,
)


Expand All @@ -208,11 +178,10 @@ def wrap_message_trace(
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
):
wrap_object(
module,
object_path,
MessageTraceWrapper,
(library, operation, destination_type, destination_name, params, terminal, async_wrapper, extract_agent_attrs),
(library, operation, destination_type, destination_name, params, terminal, async_wrapper),
)
88 changes: 80 additions & 8 deletions newrelic/hooks/external_botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import json
import logging
import re
Expand All @@ -25,10 +26,12 @@
from newrelic.api.datastore_trace import datastore_trace
from newrelic.api.external_trace import ExternalTrace
from newrelic.api.function_trace import FunctionTrace
from newrelic.api.message_trace import message_trace
from newrelic.api.time_trace import get_trace_linking_metadata
from newrelic.api.message_trace import MessageTrace, message_trace
from newrelic.api.time_trace import current_trace, get_trace_linking_metadata
from newrelic.api.transaction import current_transaction
from newrelic.common.async_wrapper import async_wrapper as get_async_wrapper
from newrelic.common.object_wrapper import (
FunctionWrapper,
ObjectProxy,
function_wrapper,
wrap_function_wrapper,
Expand Down Expand Up @@ -836,6 +839,75 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
)


def sqs_message_trace(
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
):
return functools.partial(
SQSMessageTraceWrapper,
operation=operation,
destination_type=destination_type,
destination_name=destination_name,
params=params,
terminal=terminal,
async_wrapper=async_wrapper,
extract_agent_attrs=extract_agent_attrs,
)


def SQSMessageTraceWrapper(
wrapped,
operation,
destination_type,
destination_name,
params={},
terminal=True,
async_wrapper=None,
extract_agent_attrs=None,
):
def _nr_message_trace_wrapper_(wrapped, instance, args, kwargs):
wrapper = async_wrapper if async_wrapper is not None else get_async_wrapper(wrapped)
if not wrapper:
parent = current_trace()
if not parent:
return wrapped(*args, **kwargs)
else:
parent = None

_library = "SQS"
_operation = operation
_destination_type = destination_type
_destination_name = destination_name(*args, **kwargs)

trace = MessageTrace(
_library,
_operation,
_destination_type,
_destination_name,
params={},
terminal=terminal,
parent=parent,
source=wrapped,
)

# Attach extracted agent attributes.
_agent_attrs = extract_agent_attrs(*args, **kwargs)
trace.agent_attributes.update(_agent_attrs)

if wrapper: # pylint: disable=W0125,W0126
return wrapper(wrapped, trace)(*args, **kwargs)

with trace:
return wrapped(*args, **kwargs)

return FunctionWrapper(wrapped, _nr_message_trace_wrapper_)


CUSTOM_TRACE_POINTS = {
("sns", "publish"): message_trace("SNS", "Produce", "Topic", extract(("TopicArn", "TargetArn"), "PhoneNumber")),
("dynamodb", "put_item"): datastore_trace("DynamoDB", extract("TableName"), "put_item"),
Expand All @@ -846,14 +918,14 @@ def handle_chat_completion_event(transaction, bedrock_attrs):
("dynamodb", "delete_table"): datastore_trace("DynamoDB", extract("TableName"), "delete_table"),
("dynamodb", "query"): datastore_trace("DynamoDB", extract("TableName"), "query"),
("dynamodb", "scan"): datastore_trace("DynamoDB", extract("TableName"), "scan"),
("sqs", "send_message"): message_trace(
"SQS", "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
("sqs", "send_message"): sqs_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
),
("sqs", "send_message_batch"): message_trace(
"SQS", "Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
("sqs", "send_message_batch"): sqs_message_trace(
"Produce", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
),
("sqs", "receive_message"): message_trace(
"SQS", "Consume", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
("sqs", "receive_message"): sqs_message_trace(
"Consume", "Queue", extract_sqs, extract_agent_attrs=extract_agent_attrs
),
("bedrock-runtime", "invoke_model"): wrap_bedrock_runtime_invoke_model(response_streaming=False),
("bedrock-runtime", "invoke_model_with_response_stream"): wrap_bedrock_runtime_invoke_model(
Expand Down

0 comments on commit 28a984d

Please sign in to comment.