Skip to content

Commit

Permalink
Hook up ml event to OTLP (#822)
Browse files Browse the repository at this point in the history
* Use protos and otlp protocol class for ml_events

* inferenceData -> InferenceData

* Add LogsData import

* Add utf-8 encoding for json otlp payload

* Cast timestamp to int

* Use ml_event validator in tests

* Fixup payload tests

* Change str_value -> string_value

* Move event payload gen into otlp_utils

* Fixup: put back print

* Fixup: cast as str for py27

* Fixup lint errors

* Skip py2 protobuf

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
hmstepanek and mergify[bot] authored Jun 27, 2023
1 parent f6ec42e commit 8e45f4d
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 327 deletions.
6 changes: 1 addition & 5 deletions newrelic/core/agent_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,11 +595,7 @@ def connect(
return protocol

def _to_http(self, method, payload=()):
params = dict(self._params)
params["method"] = method
if self._run_token:
params["run_id"] = self._run_token
return params, self._headers, otlp_encode(payload)
return {}, self._headers, otlp_encode(payload)

def decode_response(self, response):
return response.decode("utf-8")
10 changes: 3 additions & 7 deletions newrelic/core/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
)
from newrelic.core.agent_streaming import StreamingRpc
from newrelic.core.config import global_settings
from newrelic.core.otlp_utils import encode_metric_data
from newrelic.core.otlp_utils import encode_metric_data, encode_ml_event_data

_logger = logging.getLogger(__name__)

DIMENSIONAL_METRIC_DATA_TEMP = [] # TODO: REMOVE THIS


class Session(object):
PROTOCOL = AgentProtocol
Expand Down Expand Up @@ -125,10 +123,8 @@ def send_custom_events(self, sampling_info, custom_event_data):

def send_ml_events(self, sampling_info, custom_event_data):
"""Called to submit sample set for machine learning events."""

# TODO Make this send to MELT/OTLP endpoint instead of agent listener
payload = (self.agent_run_id, sampling_info, custom_event_data) # TODO this payload will be different
return self._protocol.send("custom_event_data", payload)
payload = encode_ml_event_data(custom_event_data, str(self.agent_run_id))
return self._otlp_protocol.send("ml_event_data", payload, path="/v1/logs")

def send_span_events(self, sampling_info, span_event_data):
"""Called to submit sample set for span events."""
Expand Down
39 changes: 31 additions & 8 deletions newrelic/core/otlp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import logging

from newrelic.common.encoding_utils import json_encode
from newrelic.core.stats_engine import CountStats, TimeStats
from newrelic.core.config import global_settings
from newrelic.core.stats_engine import CountStats, TimeStats

_logger = logging.getLogger(__name__)

Expand All @@ -33,7 +33,7 @@
try:
from newrelic.packages.opentelemetry_proto.common_pb2 import AnyValue, KeyValue
from newrelic.packages.opentelemetry_proto.logs_pb2 import (
LogRecord,
LogsData,
ResourceLogs,
ScopeLogs,
)
Expand All @@ -58,8 +58,8 @@
except Exception:
if otlp_content_setting == "protobuf":
raise # Reraise exception if content type explicitly set
else: # Fallback to JSON
otlp_content_setting = "json"
# Fallback to JSON
otlp_content_setting = "json"


if otlp_content_setting == "json":
Expand All @@ -77,7 +77,7 @@
ValueAtQuantile = dict
ResourceLogs = dict
ScopeLogs = dict
LogRecord = dict
LogsData = dict

AGGREGATION_TEMPORALITY_DELTA = 1
OTLP_CONTENT_TYPE = "application/json"
Expand All @@ -88,9 +88,8 @@ def otlp_encode(payload):
_logger.warning(
"Using OTLP integration while protobuf is not installed. This may result in larger payload sizes and data loss."
)
return json_encode(payload)
else:
return payload.SerializeToString()
return json_encode(payload).encode("utf-8")
return payload.SerializeToString()


def create_key_value(key, value):
Expand Down Expand Up @@ -216,3 +215,27 @@ def encode_metric_data(metric_data, start_time, end_time, resource=None, scope=N
)
]
)


def encode_ml_event_data(custom_event_data, agent_run_id):
resource = create_resource()
ml_events = []
for event in custom_event_data:
event_info, event_attrs = event
event_attrs.update(
{
"real_agent_id": agent_run_id,
"event.domain": "newrelic.ml_events",
"event.name": event_info["type"],
}
)
ml_attrs = create_key_values_from_iterable(event_attrs)
unix_nano_timestamp = event_info["timestamp"] * 1e6
ml_events.append(
{
"time_unix_nano": int(unix_nano_timestamp),
"attributes": ml_attrs,
}
)

return LogsData(resource_logs=[ResourceLogs(resource=resource, scope_logs=[ScopeLogs(log_records=ml_events)])])
4 changes: 2 additions & 2 deletions newrelic/hooks/mlmodel_sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def create_label_event(transaction, class_, inference_id, instance, return_val):
# Don't include the raw value when inference_event_value is disabled.
if settings and settings.machine_learning.inference_events_value.enabled:
event["label_value"] = str(value)
transaction.record_custom_event("inferenceData", event)
transaction.record_ml_event("InferenceData", event)


def _get_label_names(user_defined_label_names, prediction_array):
Expand Down Expand Up @@ -319,7 +319,7 @@ def create_feature_event(transaction, class_, inference_id, instance, args, kwar
# Don't include the raw value when inference_event_value is disabled.
if settings and settings.machine_learning and settings.machine_learning.inference_events_value.enabled:
event["feature_value"] = str(value)
transaction.record_custom_event("inferenceData", event)
transaction.record_ml_event("InferenceData", event)


def _nr_instrument_model(module, model_class):
Expand Down
66 changes: 66 additions & 0 deletions tests/agent_features/test_ml_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,27 @@
reset_core_stats_engine,
)
from testing_support.validators.validate_ml_event_count import validate_ml_event_count
from testing_support.validators.validate_ml_event_payload import (
validate_ml_event_payload,
)
from testing_support.validators.validate_ml_events import validate_ml_events
from testing_support.validators.validate_ml_events_outside_transaction import (
validate_ml_events_outside_transaction,
)

import newrelic.core.otlp_utils
from newrelic.api.application import application_instance as application
from newrelic.api.background_task import background_task
from newrelic.api.transaction import record_ml_event
from newrelic.core.config import global_settings
from newrelic.packages import six

try:
# python 2.x
reload
except NameError:
# python 3.x
from importlib import reload

_now = time.time()

Expand All @@ -38,6 +51,38 @@
}


@pytest.fixture(scope="session")
def core_app(collector_agent_registration):
app = collector_agent_registration
return app._agent.application(app.name)


@validate_ml_event_payload(
[{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}]
)
@reset_core_stats_engine()
def test_ml_event_payload_inside_transaction(core_app):
@background_task(name="test_ml_event_payload_inside_transaction")
def _test():
record_ml_event("InferenceEvent", {"foo": "bar"})

_test()
core_app.harvest()


@validate_ml_event_payload(
[{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}]
)
@reset_core_stats_engine()
def test_ml_event_payload_outside_transaction(core_app):
def _test():
app = application()
record_ml_event("InferenceEvent", {"foo": "bar"}, application=app)

_test()
core_app.harvest()


@pytest.mark.parametrize(
"params,expected",
[
Expand All @@ -47,6 +92,7 @@
],
ids=["Valid key/value", "Bad key", "Value too long"],
)
@reset_core_stats_engine()
def test_record_ml_event_inside_transaction(params, expected):
@validate_ml_events(expected)
@background_task()
Expand Down Expand Up @@ -75,6 +121,7 @@ def _test():
_test()


@reset_core_stats_engine()
@validate_ml_event_count(count=0)
@background_task()
def test_record_ml_event_inside_transaction_bad_event_type():
Expand All @@ -88,6 +135,7 @@ def test_record_ml_event_outside_transaction_bad_event_type():
record_ml_event("!@#$%^&*()", {"foo": "bar"}, application=app)


@reset_core_stats_engine()
@validate_ml_event_count(count=0)
@background_task()
def test_record_ml_event_inside_transaction_params_not_a_dict():
Expand Down Expand Up @@ -120,15 +168,33 @@ def test_ml_event_settings_check_ml_insights_enabled():


@override_application_settings({"ml_insights_events.enabled": False})
@reset_core_stats_engine()
@function_not_called("newrelic.api.transaction", "create_custom_event")
@background_task()
def test_transaction_create_ml_event_not_called():
record_ml_event("FooEvent", {"foo": "bar"})


@override_application_settings({"ml_insights_events.enabled": False})
@reset_core_stats_engine()
@function_not_called("newrelic.core.application", "create_custom_event")
@background_task()
def test_application_create_ml_event_not_called():
app = application()
record_ml_event("FooEvent", {"foo": "bar"}, application=app)


@pytest.fixture(scope="module", autouse=True, params=["protobuf", "json"])
def otlp_content_encoding(request):
if six.PY2 and request.param == "protobuf":
pytest.skip("OTLP protos are not compatible with Python 2.")

_settings = global_settings()
prev = _settings.debug.otlp_content_encoding
_settings.debug.otlp_content_encoding = request.param
reload(newrelic.core.otlp_utils)
assert newrelic.core.otlp_utils.otlp_content_setting == request.param, "Content encoding mismatch."

yield

_settings.debug.otlp_content_encoding = prev
96 changes: 0 additions & 96 deletions tests/mlmodel_sklearn/_validate_custom_events.py

This file was deleted.

Loading

0 comments on commit 8e45f4d

Please sign in to comment.