Skip to content

Commit

Permalink
feat: track runtime schema definitions for log events (#17692)
Browse files Browse the repository at this point in the history
closes #16732

In order for sinks to use semantic meaning, they need a mapping of
meanings to fields. This is included in the schema definition of events,
but the exact definition that needs to be used depends on the path the
event took to get to the sink. The schema definition of an event is
tracked at runtime so this can be determined.

A `parent_id` was added to event metadata to track the previous
component that an event came from, which lets the topology select the
correct schema definition to attach to events.

For sources, there is only one definition that can be attached (for each
port). This is automatically attached in the topology layer (after an
event is emitted by a source), so there is no additional work in each
source to support this.

For transforms, it's slightly more complicated. The schema definition
depends on both the output port _and_ the component the event came from.
A map is generated at Vector startup, and the correct definition is
obtained from that at runtime. This also happens in the topology layer
so transforms don't need to worry about this.

Previously the `remap` transform had custom code to support runtime
schema definitions (for the VRL meaning functions). This was removed
since it's now handled automatically.

The `reduce` and `lua` transforms are special cases since there is no
clear "path" that an event takes through the topology, since multiple
events can be merged (from different inputs) in `reduce`. For `lua`,
output events may not be related to input events at all. In these cases
the schema definition map will have the same value for all inputs (they
are all merged). The topology will then arbitrarily pick one (since they
are all the same).

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
Co-authored-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
fuchsnj and StephenWakely authored Jun 28, 2023
1 parent f60fe00 commit ee5b389
Show file tree
Hide file tree
Showing 25 changed files with 847 additions and 428 deletions.
2 changes: 1 addition & 1 deletion lib/vector-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ rand = "0.8.5"
rand_distr = "0.4.3"
tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt", "ansi", "registry"] }
vector-common = { path = "../vector-common", default-features = false, features = ["test"] }
vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua"] }
vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua", "test"] }

[features]
api = ["dep:async-graphql"]
Expand Down
28 changes: 13 additions & 15 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::{collections::HashMap, fmt, num::NonZeroUsize};

use bitmask_enum::bitmask;
Expand Down Expand Up @@ -111,7 +112,7 @@ pub struct SourceOutput {
// NOTE: schema definitions are only implemented/supported for log-type events. There is no
// inherent blocker to support other types as well, but it'll require additional work to add
// the relevant schemas, and store them separately in this type.
pub schema_definition: Option<schema::Definition>,
pub schema_definition: Option<Arc<schema::Definition>>,
}

impl SourceOutput {
Expand All @@ -129,7 +130,7 @@ impl SourceOutput {
Self {
port: None,
ty,
schema_definition: Some(schema_definition),
schema_definition: Some(Arc::new(schema_definition)),
}
}

Expand Down Expand Up @@ -168,17 +169,15 @@ impl SourceOutput {
/// Schema enabled is set in the users configuration.
#[must_use]
pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
use std::ops::Deref;

self.schema_definition.as_ref().map(|definition| {
if schema_enabled {
definition.clone()
definition.deref().clone()
} else {
let mut new_definition =
schema::Definition::default_for_namespace(definition.log_namespaces());

if definition.log_namespaces().contains(&LogNamespace::Vector) {
new_definition.add_meanings(definition.meanings());
}

new_definition.add_meanings(definition.meanings());
new_definition
}
})
Expand All @@ -203,7 +202,7 @@ pub struct TransformOutput {
/// enabled, at least one definition should be output. If the transform
/// has multiple connected sources, it is possible to have multiple output
/// definitions - one for each input.
log_schema_definitions: HashMap<OutputId, schema::Definition>,
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
}

impl TransformOutput {
Expand Down Expand Up @@ -245,11 +244,7 @@ impl TransformOutput {
.map(|(output, definition)| {
let mut new_definition =
schema::Definition::default_for_namespace(definition.log_namespaces());

if definition.log_namespaces().contains(&LogNamespace::Vector) {
new_definition.add_meanings(definition.meanings());
}

new_definition.add_meanings(definition.meanings());
(output.clone(), new_definition)
})
.collect()
Expand Down Expand Up @@ -606,7 +601,10 @@ mod test {

// There should be the default legacy definition without schemas enabled.
assert_eq!(
Some(schema::Definition::default_legacy_namespace()),
Some(
schema::Definition::default_legacy_namespace()
.with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork")
),
output.schema_definition(false)
);
}
Expand Down
27 changes: 25 additions & 2 deletions lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use vector_common::{config::ComponentKey, EventDataEq};
use vrl::value::{Kind, Secrets, Value};

use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus};
use crate::{config::LogNamespace, schema, ByteSizeOf};
use crate::{
config::{LogNamespace, OutputId},
schema, ByteSizeOf,
};

const DATADOG_API_KEY: &str = "datadog_api_key";
const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token";
Expand All @@ -30,8 +33,15 @@ pub struct EventMetadata {
/// The id of the source
source_id: Option<Arc<ComponentKey>>,

/// The id of the component this event originated from. This is used to
/// determine which schema definition to attach to an event in transforms.
/// This should always have a value set for events in transforms. It will always be `None`
/// in a source, and there is currently no use-case for reading the value in a sink.
upstream_id: Option<Arc<OutputId>>,

/// An identifier for a globally registered schema definition which provides information about
/// the event shape (type information, and semantic meaning of fields).
/// This definition is only currently valid for logs, and shouldn't be used for other event types.
///
/// TODO(Jean): must not skip serialization to track schemas across restarts.
#[serde(default = "default_schema_definition", skip)]
Expand Down Expand Up @@ -71,17 +81,29 @@ impl EventMetadata {
&mut self.secrets
}

/// Returns a reference to the metadata source.
/// Returns a reference to the metadata source id.
#[must_use]
pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
self.source_id.as_ref()
}

/// Returns a reference to the metadata parent id. This is the `OutputId`
/// of the previous component the event was sent through (if any).
#[must_use]
pub fn upstream_id(&self) -> Option<&OutputId> {
self.upstream_id.as_deref()
}

/// Sets the `source_id` in the metadata to the provided value.
pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
self.source_id = Some(source_id);
}

/// Sets the `upstream_id` in the metadata to the provided value.
pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
self.upstream_id = Some(upstream_id);
}

/// Return the datadog API key, if it exists
pub fn datadog_api_key(&self) -> Option<Arc<str>> {
self.secrets.get(DATADOG_API_KEY).cloned()
Expand Down Expand Up @@ -111,6 +133,7 @@ impl Default for EventMetadata {
finalizers: Default::default(),
schema_definition: default_schema_definition(),
source_id: None,
upstream_id: None,
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
sync::Arc,
};

use crate::ByteSizeOf;
use crate::{config::OutputId, ByteSizeOf};
pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray};
pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
pub use finalization::{
Expand Down Expand Up @@ -309,12 +309,24 @@ impl Event {
self.metadata_mut().set_source_id(source_id);
}

/// Sets the `upstream_id` in the event metadata to the provided value.
pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
self.metadata_mut().set_upstream_id(upstream_id);
}

/// Sets the `source_id` in the event metadata to the provided value.
#[must_use]
pub fn with_source_id(mut self, source_id: Arc<ComponentKey>) -> Self {
self.metadata_mut().set_source_id(source_id);
self
}

/// Sets the `upstream_id` in the event metadata to the provided value.
#[must_use]
pub fn with_upstream_id(mut self, upstream_id: Arc<OutputId>) -> Self {
self.metadata_mut().set_upstream_id(upstream_id);
self
}
}

impl EventDataEq for Event {
Expand Down
Loading

0 comments on commit ee5b389

Please sign in to comment.