Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(new_relic sink): Multiple fixes related to metrics #18151

Merged
merged 26 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
05025b1
Convert tags into attributes.
asllop Sep 15, 2022
4cdef4e
Merge branch 'vectordotdev:master' into master
asllop Sep 15, 2022
087d4bf
Decompose metric into parts to reuse tags instead of cloning.
asllop Sep 16, 2022
2dd2668
Remove unnecessary blocks.
asllop Sep 16, 2022
6020efb
Set metric type.
asllop Sep 22, 2022
9e7d57c
Fix field name typo.
asllop Sep 22, 2022
567f236
Remove unnecessary brackets.
asllop Sep 23, 2022
c3b4784
Merge branch 'master' into master
asllop Sep 23, 2022
b755769
Cargo fmt.
asllop Oct 20, 2022
67943c9
Merge branch 'master' of https://github.com/asllop/vector
asllop Oct 20, 2022
48f16d8
Update src/sinks/new_relic/model.rs
asllop Nov 3, 2022
af14a7d
Update src/sinks/new_relic/model.rs
asllop Nov 3, 2022
137943c
Fix typo.
asllop Nov 3, 2022
2a2282c
Add tests for newly supported metrics.
asllop Nov 3, 2022
b72faed
Simplify metric type cases.
asllop Nov 3, 2022
84a8d61
Merge branch 'vectordotdev:master' into master
asllop Nov 4, 2022
41b41ab
Cargo fmt.
asllop Dec 12, 2022
f247c95
Merge remote-tracking branch 'origin/master' into asllop/master
jszwedko Aug 3, 2023
62addd3
Update to handle new tag model
jszwedko Aug 3, 2023
1f2bc83
PR feedback
jszwedko Aug 4, 2023
f256b33
refactor
dsmith3197 Aug 11, 2023
0ecfa81
add dropped event metrics
dsmith3197 Aug 22, 2023
c3769db
track unsupported metric types as well
dsmith3197 Aug 22, 2023
494b12e
merge master
dsmith3197 Aug 22, 2023
c11c958
feedback
dsmith3197 Sep 11, 2023
4e4943f
more feedback
dsmith3197 Sep 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 156 additions & 68 deletions src/sinks/new_relic/model.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use std::{collections::HashMap, convert::TryFrom, fmt::Debug, time::SystemTime};
use std::{
collections::{BTreeMap, HashMap},
convert::TryFrom,
fmt::Debug,
time::SystemTime,
};

use chrono::{DateTime, Utc};
use ordered_float::NotNan;
use serde::{Deserialize, Serialize};
use vector_common::internal_event::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
use vrl::event_path;

use super::NewRelicSinkError;
use crate::event::{Event, MetricValue, Value};
use crate::event::{Event, MetricKind, MetricValue, Value};

#[derive(Debug)]
pub enum NewRelicApiModel {
Expand All @@ -22,30 +28,9 @@ type DataStore = HashMap<String, Vec<KeyValData>>;
pub struct MetricsApiModel(pub Vec<DataStore>);

impl MetricsApiModel {
pub fn new(metric_array: Vec<(Value, Value, Value)>) -> Self {
let mut metric_data_array = vec![];
for (m_name, m_value, m_timestamp) in metric_array {
let mut metric_data = KeyValData::new();
metric_data.insert("name".to_owned(), m_name);
metric_data.insert("value".to_owned(), m_value);
match m_timestamp {
Value::Timestamp(ts) => {
metric_data.insert("timestamp".to_owned(), Value::from(ts.timestamp()));
}
Value::Integer(i) => {
metric_data.insert("timestamp".to_owned(), Value::from(i));
}
_ => {
metric_data.insert(
"timestamp".to_owned(),
Value::from(DateTime::<Utc>::from(SystemTime::now()).timestamp()),
);
}
}
metric_data_array.push(metric_data);
}
pub fn new(metric_array: Vec<KeyValData>) -> Self {
let mut metric_store = DataStore::new();
metric_store.insert("metrics".to_owned(), metric_data_array);
metric_store.insert("metrics".to_owned(), metric_array);
Self(vec![metric_store])
}
}
Expand All @@ -54,43 +39,107 @@ impl TryFrom<Vec<Event>> for MetricsApiModel {
type Error = NewRelicSinkError;

fn try_from(buf_events: Vec<Event>) -> Result<Self, Self::Error> {
let mut metric_array = vec![];

for buf_event in buf_events {
if let Event::Metric(metric) = buf_event {
// Future improvement: put metric type. If type = count, NR metric model requires an interval.ms field, that is not provided by the Vector Metric model.
match metric.value() {
MetricValue::Gauge { value } => {
metric_array.push((
Value::from(metric.name().to_owned()),
Value::from(
NotNan::new(*value).map_err(|_| {
NewRelicSinkError::new("NaN value not supported")
})?,
),
Value::from(metric.timestamp()),
));
}
MetricValue::Counter { value } => {
metric_array.push((
Value::from(metric.name().to_owned()),
Value::from(
NotNan::new(*value).map_err(|_| {
NewRelicSinkError::new("NaN value not supported")
})?,
),
Value::from(metric.timestamp()),
));
let mut num_non_metric_events = 0;
let mut num_missing_interval = 0;
let mut num_nan_value = 0;
let mut num_unsupported_metric_type = 0;

let metric_array: Vec<_> = buf_events
.into_iter()
.filter_map(|event| {
let metric = event.try_into_metric().or_else(|| {
num_non_metric_events += 1;
None
})?;
dsmith3197 marked this conversation as resolved.
Show resolved Hide resolved

// Generate Value::Object() from BTreeMap<String, String>
let (series, data, _) = metric.into_parts();

let mut metric_data = KeyValData::new();

// We only handle gauge and counter metrics
// Extract value & type and set type-related attributes
let (value, metric_type) = match (data.value, &data.kind) {
(MetricValue::Counter { value }, MetricKind::Incremental) => {
let interval_ms = data.time.interval_ms.or_else(|| {
// Incremental counter without an interval is worthless, skip this metric
num_missing_interval += 1;
None
})?;
metric_data.insert(
"interval.ms".to_owned(),
Value::from(interval_ms.get() as i64),
);
(value, "count")
}
(MetricValue::Counter { value }, MetricKind::Absolute) => (value, "gauge"),
(MetricValue::Gauge { value }, _) => (value, "gauge"),
_ => {
// Unrecognized metric type
// Unsupported metric type
num_unsupported_metric_type += 1;
return None;
}
};

// Set name, type, value, timestamp, and attributes
metric_data.insert("name".to_owned(), Value::from(series.name.name));
metric_data.insert("type".to_owned(), Value::from(metric_type));
let value = NotNan::new(value).ok().or_else(|| {
num_nan_value += 1;
None
})?;
metric_data.insert("value".to_owned(), Value::from(value));
metric_data.insert(
"timestamp".to_owned(),
Value::from(
data.time
.timestamp
.unwrap_or_else(|| DateTime::<Utc>::from(SystemTime::now()))
.timestamp(),
),
);
if let Some(tags) = series.tags {
metric_data.insert(
"attributes".to_owned(),
Value::from(
tags.iter_single()
.map(|(key, value)| (key.to_string(), Value::from(value)))
.collect::<BTreeMap<_, _>>(),
),
);
}
}

Some(metric_data)
})
.collect();

if num_non_metric_events > 0 {
emit!(ComponentEventsDropped::<INTENTIONAL> {
count: num_non_metric_events,
reason: "non-metric event"
});
}
if num_unsupported_metric_type > 0 {
emit!(ComponentEventsDropped::<INTENTIONAL> {
count: num_unsupported_metric_type,
reason: "unsupported metric type"
});
}
if num_nan_value > 0 {
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: num_nan_value,
reason: "NaN value not supported"
});
}
if num_missing_interval > 0 {
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: num_missing_interval,
reason: "incremental counter missing interval"
});
}
Comment on lines +116 to 139
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere, I'm not a fan of emitting these events within a TryFrom implementation but it made the most sense given the existing implementation. I'm happy to consider alternatives if others have strong opinions.


if !metric_array.is_empty() {
Ok(MetricsApiModel::new(metric_array))
Ok(Self::new(metric_array))
} else {
Err(NewRelicSinkError::new("No valid metrics to generate"))
}
Expand All @@ -110,9 +159,17 @@ impl TryFrom<Vec<Event>> for EventsApiModel {
type Error = NewRelicSinkError;

fn try_from(buf_events: Vec<Event>) -> Result<Self, Self::Error> {
let mut events_array = vec![];
for buf_event in buf_events {
if let Event::Log(log) = buf_event {
let mut num_non_log_events = 0;
let mut num_nan_value = 0;

let events_array: Vec<HashMap<String, Value>> = buf_events
.into_iter()
.filter_map(|event| {
let log = event.try_into_log().or_else(|| {
num_non_log_events += 1;
None
})?;

let mut event_model = KeyValData::new();
for (k, v) in log.convert_to_fields() {
event_model.insert(k, v.clone());
Expand All @@ -133,8 +190,9 @@ impl TryFrom<Vec<Event>> for EventsApiModel {
if let Some(f) = n.as_f64() {
event_model.insert(
k,
Value::from(NotNan::new(f).map_err(|_| {
NewRelicSinkError::new("NaN value not supported")
Value::from(NotNan::new(f).ok().or_else(|| {
num_nan_value += 1;
None
})?),
);
} else {
Expand All @@ -144,7 +202,9 @@ impl TryFrom<Vec<Event>> for EventsApiModel {
serde_json::Value::Bool(b) => {
event_model.insert(k, Value::from(b));
}
_ => {}
_ => {
// Note that arrays and nested objects are silently dropped.
}
}
}
event_model.remove("message");
Expand All @@ -156,8 +216,21 @@ impl TryFrom<Vec<Event>> for EventsApiModel {
.insert("eventType".to_owned(), Value::from("VectorSink".to_owned()));
}

events_array.push(event_model);
}
Some(event_model)
})
.collect();

if num_non_log_events > 0 {
emit!(ComponentEventsDropped::<INTENTIONAL> {
count: num_non_log_events,
reason: "non-log event"
});
}
if num_nan_value > 0 {
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: num_nan_value,
reason: "NaN value not supported"
});
}

if !events_array.is_empty() {
Expand All @@ -183,9 +256,16 @@ impl TryFrom<Vec<Event>> for LogsApiModel {
type Error = NewRelicSinkError;

fn try_from(buf_events: Vec<Event>) -> Result<Self, Self::Error> {
let mut logs_array = vec![];
for buf_event in buf_events {
if let Event::Log(log) = buf_event {
let mut num_non_log_events = 0;

let logs_array: Vec<HashMap<String, Value>> = buf_events
.into_iter()
.filter_map(|event| {
let log = event.try_into_log().or_else(|| {
num_non_log_events += 1;
None
})?;

let mut log_model = KeyValData::new();
for (k, v) in log.convert_to_fields() {
log_model.insert(k, v.clone());
Expand All @@ -196,8 +276,16 @@ impl TryFrom<Vec<Event>> for LogsApiModel {
Value::from("log from vector".to_owned()),
);
}
logs_array.push(log_model);
}

Some(log_model)
})
.collect();

if num_non_log_events > 0 {
emit!(ComponentEventsDropped::<INTENTIONAL> {
count: num_non_log_events,
reason: "non-log event"
});
}

if !logs_array.is_empty() {
Expand Down
33 changes: 30 additions & 3 deletions src/sinks/new_relic/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, convert::TryFrom, time::SystemTime};
use std::{collections::HashMap, convert::TryFrom, num::NonZeroU32, time::SystemTime};

use chrono::{DateTime, Utc};
use futures::{future::ready, stream};
Expand Down Expand Up @@ -211,7 +211,7 @@ fn generate_metric_api_model() {
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Logs data store not present");
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
Expand All @@ -235,7 +235,7 @@ fn generate_metric_api_model() {
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Logs data store not present");
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
Expand All @@ -246,4 +246,31 @@ fn generate_metric_api_model() {
assert!(metrics[0].get("value").is_some());
assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0));
assert!(metrics[0].get("timestamp").is_some());

// Incremental counter
let m = Metric::new(
"my_metric",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_timestamp(Some(DateTime::<Utc>::from(SystemTime::now())))
.with_interval_ms(NonZeroU32::new(1000));
let event = Event::Metric(m);
let model =
MetricsApiModel::try_from(vec![event]).expect("Failed mapping metrics into API model");
let metrics = model.0[0]
.get("metrics")
.expect("Metric data store not present");

assert_eq!(metrics.len(), 1);
assert!(metrics[0].get("name").is_some());
assert_eq!(
metrics[0].get("name").unwrap().to_string_lossy(),
"my_metric".to_owned()
);
assert!(metrics[0].get("value").is_some());
assert_eq!(metrics[0].get("value").unwrap(), &Value::from(100.0));
assert!(metrics[0].get("timestamp").is_some());
assert!(metrics[0].get("interval.ms").is_some());
assert_eq!(metrics[0].get("interval.ms").unwrap(), &Value::from(1000));
}