Skip to content

Commit

Permalink
feat: Migrate LogSchema source_type_key to new lookup code (#17947)
Browse files Browse the repository at this point in the history
This part of #13033.

Tried this with a config where `source_type_key` is:
* `""` --> no source entry
* not defined --> same as above
* `"foo"` --> `"foo":"demo_logs"`
* `"foo.bar"` --> `"foo":{"bar":"demo_logs"}`
* `"foo.."` --> `Configuration error. error=Invalid field path "foo.."`

The config:
```
data_dir = "/Users/pavlos.rontidis/my_tests/vector"

[log_schema]
source_type_key = "foo"

[sources.source0]
format = "json"
type = "demo_logs"

[sinks.sink0]
inputs = ["source0"]
target = "stdout"
type = "console"

[sinks.sink0.encoding]
codec = "json"
```
  • Loading branch information
pront authored Jul 13, 2023
1 parent f81ee19 commit baf56e1
Show file tree
Hide file tree
Showing 35 changed files with 240 additions and 140 deletions.
2 changes: 1 addition & 1 deletion lib/opentelemetry-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl ResourceLog {

log_namespace.insert_vector_metadata(
&mut log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(SOURCE_NAME.as_bytes()),
);
Expand Down
25 changes: 12 additions & 13 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use lookup::lookup_v2::{parse_target_path, OptionalValuePath};
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath};
use lookup::lookup_v2::OptionalValuePath;
use lookup::{OwnedTargetPath, OwnedValuePath};
use once_cell::sync::{Lazy, OnceCell};
use vector_config::configurable_component;
use vrl::path::parse_target_path;

static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);
Expand Down Expand Up @@ -60,7 +61,7 @@ pub struct LogSchema {
///
/// This field will be set by the Vector source that the event was created in.
#[serde(default = "LogSchema::default_source_type_key")]
source_type_key: String,
source_type_key: OptionalValuePath,

/// The name of the event field to set the event metadata in.
///
Expand Down Expand Up @@ -88,17 +89,15 @@ impl LogSchema {
}

fn default_timestamp_key() -> OptionalValuePath {
OptionalValuePath {
path: Some(owned_value_path!("timestamp")),
}
OptionalValuePath::new("timestamp")
}

fn default_host_key() -> String {
String::from("host")
}

fn default_source_type_key() -> String {
String::from("source_type")
fn default_source_type_key() -> OptionalValuePath {
OptionalValuePath::new("source_type")
}

fn default_metadata_key() -> String {
Expand Down Expand Up @@ -126,8 +125,8 @@ impl LogSchema {
&self.host_key
}

pub fn source_type_key(&self) -> &str {
&self.source_type_key
pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
self.source_type_key.path.as_ref()
}

pub fn metadata_key(&self) -> &str {
Expand All @@ -146,8 +145,8 @@ impl LogSchema {
self.host_key = v;
}

pub fn set_source_type_key(&mut self, v: String) {
self.source_type_key = v;
pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
self.source_type_key = OptionalValuePath { path };
}

pub fn set_metadata_key(&mut self, v: String) {
Expand Down Expand Up @@ -191,7 +190,7 @@ impl LogSchema {
{
errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
} else {
self.set_source_type_key(other.source_type_key().to_string());
self.set_source_type_key(other.source_type_key().cloned());
}
if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
&& self.metadata_key() != other.metadata_key()
Expand Down
7 changes: 4 additions & 3 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl LogNamespace {
) {
self.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(source_name.as_bytes()),
);
Expand Down Expand Up @@ -551,14 +551,15 @@ mod test {
use chrono::Utc;
use lookup::{event_path, owned_value_path, OwnedTargetPath};
use vector_common::btreemap;
use vrl::path::OwnedValuePath;
use vrl::value::Kind;

#[test]
fn test_insert_standard_vector_source_metadata() {
let nested_path = "a.b.c.d";
let nested_path = "a.b.c.d".to_string();

let mut schema = LogSchema::default();
schema.set_source_type_key(nested_path.to_owned());
schema.set_source_type_key(Some(OwnedValuePath::try_from(nested_path).unwrap()));
init_log_schema(schema, false);

let namespace = LogNamespace::Legacy;
Expand Down
10 changes: 6 additions & 4 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,10 @@ impl LogEvent {
/// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
// "Global Log Schema" are updated to the new path lookup code
pub fn source_type_path(&self) -> &'static str {
pub fn source_type_path(&self) -> Option<String> {
match self.namespace() {
LogNamespace::Vector => "%vector.source_type",
LogNamespace::Legacy => log_schema().source_type_key(),
LogNamespace::Vector => Some("%vector.source_type".to_string()),
LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string),
}
}

Expand Down Expand Up @@ -514,7 +514,9 @@ impl LogEvent {
pub fn get_source_type(&self) -> Option<&Value> {
match self.namespace() {
LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().source_type_key())),
LogNamespace::Legacy => log_schema()
.source_type_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions lib/vector-core/src/schema/definition.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};

use crate::config::{log_schema, LegacyKey, LogNamespace};
use lookup::lookup_v2::{parse_value_path, TargetPath};
use lookup::lookup_v2::TargetPath;
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
use vrl::value::{kind::Collection, Kind};

Expand Down Expand Up @@ -144,9 +144,7 @@ impl Definition {
#[must_use]
pub fn with_standard_vector_source_metadata(self) -> Self {
self.with_vector_metadata(
parse_value_path(log_schema().source_type_key())
.ok()
.as_ref(),
log_schema().source_type_key(),
&owned_value_path!("source_type"),
Kind::bytes(),
None,
Expand Down
7 changes: 7 additions & 0 deletions lib/vector-lookup/src/lookup_v2/optional_path.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use vector_config::configurable_component;
use vrl::owned_value_path;

use crate::lookup_v2::PathParseError;
use crate::{OwnedTargetPath, OwnedValuePath};
Expand Down Expand Up @@ -56,6 +57,12 @@ impl OptionalValuePath {
pub fn none() -> Self {
Self { path: None }
}

pub fn new(path: &str) -> Self {
Self {
path: Some(owned_value_path!(path)),
}
}
}

impl TryFrom<String> for OptionalValuePath {
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/datadog/events/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ async fn ensure_required_fields(event: Event) -> Option<Event> {
}

if !log.contains("source_type_name") {
log.rename_key(log.source_type_path(), "source_type_name")
if let Some(source_type_path) = log.source_type_path() {
log.rename_key(source_type_path.as_str(), "source_type_name")
}
}

Some(Event::from(log))
Expand Down
20 changes: 11 additions & 9 deletions src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,8 @@ impl SinkConfig for InfluxDbLogsConfig {
.source_type_key
.clone()
.and_then(|k| k.path)
.unwrap_or_else(|| {
parse_value_path(log_schema().source_type_key())
.expect("global log_schema.source_type_key to be valid path")
});
.or(log_schema().source_type_key().cloned())
.unwrap();

let sink = InfluxDbLogsSink {
uri,
Expand Down Expand Up @@ -280,11 +278,15 @@ impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
self.tags.replace(host_path.clone());
log.rename_key(host_path.as_str(), (PathPrefix::Event, &self.host_key));
}
self.tags.replace(log.source_type_path().to_string());
log.rename_key(
log.source_type_path(),
(PathPrefix::Event, &self.source_type_key),
);

if let Some(source_type_path) = log.source_type_path() {
self.tags.replace(source_type_path.clone());
log.rename_key(
source_type_path.as_str(),
(PathPrefix::Event, &self.source_type_key),
);
}

self.tags.replace("metric_type".to_string());
log.insert("metric_type", "logs");

Expand Down
7 changes: 5 additions & 2 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ fn populate_event(

log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
);
Expand Down Expand Up @@ -713,7 +713,10 @@ mod integration_test {
trace!("{:?}", log);
assert_eq!(log[log_schema().message_key()], "my message".into());
assert_eq!(log["routing"], routing_key.into());
assert_eq!(log[log_schema().source_type_key()], "amqp".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"amqp".into()
);
let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
.as_timestamp()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_kinesis_firehose/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub(super) async fn firehose(
if let Event::Log(ref mut log) = event {
log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
);
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ fn handle_single_log(

log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AwsS3Config::NAME.as_bytes()),
);
Expand Down
6 changes: 4 additions & 2 deletions src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub struct ApiKeyQueryParams {
pub(crate) struct DatadogAgentSource {
pub(crate) api_key_extractor: ApiKeyExtractor,
pub(crate) log_schema_host_key: &'static str,
pub(crate) log_schema_source_type_key: &'static str,
pub(crate) log_schema_source_type_key: String,
pub(crate) log_namespace: LogNamespace,
pub(crate) decoder: Decoder,
protocol: &'static str,
Expand Down Expand Up @@ -334,7 +334,9 @@ impl DatadogAgentSource {
.expect("static regex always compiles"),
},
log_schema_host_key: log_schema().host_key(),
log_schema_source_type_key: log_schema().source_type_key(),
log_schema_source_type_key: log_schema()
.source_type_key()
.map_or("".to_string(), |key| key.to_string()),
decoder,
protocol,
logs_schema_definition: Arc::new(logs_schema_definition),
Expand Down
40 changes: 32 additions & 8 deletions src/sources/datadog_agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ async fn full_payload_v1() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -300,7 +303,10 @@ async fn full_payload_v2() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -362,7 +368,10 @@ async fn no_api_key() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -423,7 +432,10 @@ async fn api_key_in_url() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -488,7 +500,10 @@ async fn api_key_in_query_params() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -559,7 +574,10 @@ async fn api_key_in_header() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -706,7 +724,10 @@ async fn ignores_api_key() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
event.metadata().schema_definition(),
Expand Down Expand Up @@ -1398,7 +1419,10 @@ async fn split_outputs() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down
4 changes: 2 additions & 2 deletions src/sources/datadog_agent/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ fn handle_dd_trace_payload_v1(
.set_datadog_api_key(Arc::clone(k));
}
trace_event.insert(
source.log_schema_source_type_key,
source.log_schema_source_type_key.as_str(),
Bytes::from("datadog_agent"),
);
trace_event.insert("payload_version", "v2".to_string());
Expand Down Expand Up @@ -255,7 +255,7 @@ fn handle_dd_trace_payload_v0(
trace_event.insert("language_name", lang.clone());
}
trace_event.insert(
source.log_schema_source_type_key,
source.log_schema_source_type_key.as_str(),
Bytes::from("datadog_agent"),
);
trace_event.insert("payload_version", "v1".to_string());
Expand Down
Loading

0 comments on commit baf56e1

Please sign in to comment.