diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index bff0afcb04023..27c653380bfb0 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -201,7 +201,7 @@ pub struct TransformOutput { /// enabled, at least one definition should be output. If the transform /// has multiple connected sources, it is possible to have multiple output /// definitions - one for each input. - pub log_schema_definitions: HashMap, + log_schema_definitions: HashMap, } impl TransformOutput { @@ -222,6 +222,37 @@ impl TransformOutput { self.port = Some(name.into()); self } + + /// Return the schema [`schema::Definition`] from this output. + /// + /// Takes a `schema_enabled` flag to determine if the full definition including the fields + /// and associated types should be returned, or if a simple definition should be returned. + /// A simple definition is just the default for the namespace. For the Vector namespace the + /// meanings are included. + /// Schema enabled is set in the users configuration. + #[must_use] + pub fn schema_definitions( + &self, + schema_enabled: bool, + ) -> HashMap { + if schema_enabled { + self.log_schema_definitions.clone() + } else { + self.log_schema_definitions + .iter() + .map(|(output, definition)| { + let mut new_definition = + schema::Definition::default_for_namespace(definition.log_namespaces()); + + if definition.log_namespaces().contains(&LogNamespace::Vector) { + new_definition.add_meanings(definition.meanings()); + } + + (output.clone(), new_definition) + }) + .collect() + } + } } /// Simple utility function that can be used by transforms that make no changes to diff --git a/src/api/schema/components/mod.rs b/src/api/schema/components/mod.rs index d39898c256a45..6f096034ad87a 100644 --- a/src/api/schema/components/mod.rs +++ b/src/api/schema/components/mod.rs @@ -14,14 +14,13 @@ use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt}; use vector_config::NamedComponent; use vector_core::internal_event::DEFAULT_OUTPUT; -use crate::topology::schema::possible_definitions; use crate::{ api::schema::{ components::state::component_by_component_key, filter::{self, filter_items}, relay, sort, }, - config::{ComponentKey, Config}, + config::{get_transform_output_ids, ComponentKey, Config}, filter_check, }; @@ -254,7 +253,6 @@ impl ComponentsSubscription { /// Update the 'global' configuration that will be consumed by component queries pub fn update_config(config: &Config) { - let mut cache = HashMap::new(); let mut new_components = HashMap::new(); // Sources @@ -291,15 +289,13 @@ pub fn update_config(config: &Config) { component_key: component_key.clone(), component_type: transform.inner.get_component_name().to_string(), inputs: transform.inputs.clone(), - outputs: transform - .inner - .outputs( - &possible_definitions(&transform.inputs, config, &mut cache), - config.schema.log_namespace(), - ) - .into_iter() - .map(|output| output.port.unwrap_or_else(|| DEFAULT_OUTPUT.to_string())) - .collect(), + outputs: get_transform_output_ids( + transform.inner.as_ref(), + "".into(), + config.schema.log_namespace(), + ) + .map(|output| output.port.unwrap_or_else(|| DEFAULT_OUTPUT.to_string())) + .collect(), })), ); } diff --git a/src/config/compiler.rs b/src/config/compiler.rs index f4170505bf557..90ca43ddd7adf 100644 --- a/src/config/compiler.rs +++ b/src/config/compiler.rs @@ -1,7 +1,8 @@ use indexmap::IndexSet; use super::{ - builder::ConfigBuilder, graph::Graph, id::Inputs, schema, validation, Config, OutputId, + builder::ConfigBuilder, graph::Graph, id::Inputs, transform::get_transform_output_ids, + validation, Config, OutputId, }; pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec), Vec> { @@ -137,16 +138,7 @@ pub(crate) fn expand_globs(config: &mut ConfigBuilder) { }) }) .chain(config.transforms.iter().flat_map(|(key, t)| { - t.inner - .outputs( - &[(key.into(), schema::Definition::any())], - config.schema.log_namespace(), - ) - .into_iter() - .map(|output| OutputId { - component: key.clone(), - port: output.port, - }) + get_transform_output_ids(t.inner.as_ref(), key.clone(), config.schema.log_namespace()) })) .map(|output_id| output_id.to_string()) .collect::>(); diff --git a/src/config/graph.rs b/src/config/graph.rs index 1b91ed11d9ccc..859b590257de2 100644 --- a/src/config/graph.rs +++ b/src/config/graph.rs @@ -77,6 +77,7 @@ impl Graph { Node::Transform { in_ty: transform.inner.input().data_type(), outputs: transform.inner.outputs( + enrichment::TableRegistry::default(), &[(id.into(), schema::Definition::any())], schema.log_namespace(), ), diff --git a/src/config/mod.rs b/src/config/mod.rs index 8c3ceb9c999eb..473961f2eaf7c 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -54,7 +54,9 @@ pub use provider::ProviderConfig; pub use secret::SecretBackend; pub use sink::{SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter}; pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter}; -pub use transform::{BoxedTransform, TransformConfig, TransformContext, TransformOuter}; +pub use transform::{ + get_transform_output_ids, BoxedTransform, TransformConfig, TransformContext, TransformOuter, +}; pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult}; pub use validation::warnings; pub use vector_core::config::{ diff --git a/src/config/transform.rs b/src/config/transform.rs index e2447a70e8ad1..c2be848d53361 100644 --- a/src/config/transform.rs +++ b/src/config/transform.rs @@ -193,6 +193,7 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + /// of events flowing through the transform. fn outputs( &self, + enrichment_tables: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], global_log_namespace: LogNamespace, ) -> Vec; @@ -236,3 +237,23 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + } dyn_clone::clone_trait_object!(TransformConfig); + +/// Often we want to call outputs just to retrieve the OutputId's without needing +/// the schema definitions. +pub fn get_transform_output_ids( + transform: &T, + key: ComponentKey, + global_log_namespace: LogNamespace, +) -> impl Iterator + '_ { + transform + .outputs( + enrichment::TableRegistry::default(), + &[(key.clone().into(), schema::Definition::any())], + global_log_namespace, + ) + .into_iter() + .map(move |output| OutputId { + component: key.clone(), + port: output.port, + }) +} diff --git a/src/config/unit_test/mod.rs b/src/config/unit_test/mod.rs index 273b5fc70b849..62b9dabcd70ad 100644 --- a/src/config/unit_test/mod.rs +++ b/src/config/unit_test/mod.rs @@ -20,7 +20,7 @@ pub use self::unit_test_components::{ UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig, UnitTestStreamSinkConfig, UnitTestStreamSourceConfig, }; -use super::{compiler::expand_globs, graph::Graph, OutputId}; +use super::{compiler::expand_globs, graph::Graph, transform::get_transform_output_ids, OutputId}; use crate::{ conditions::Condition, config::{ @@ -186,14 +186,11 @@ impl UnitTestBuildMetadata { .transforms .iter() .flat_map(|(key, transform)| { - transform - .inner - .outputs(&[], builder.schema.log_namespace()) - .into_iter() - .map(|output| OutputId { - component: key.clone(), - port: output.port, - }) + get_transform_output_ids( + transform.inner.as_ref(), + key.clone(), + builder.schema.log_namespace(), + ) }) .collect::>(); @@ -457,18 +454,13 @@ async fn build_unit_test( fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option> { let config = config.clone(); let transform_ids = config.transforms.iter().flat_map(|(key, transform)| { - transform - .inner - .outputs(&[], config.schema.log_namespace()) - .iter() - .map(|output| { - if let Some(port) = &output.port { - OutputId::from((key, port.clone())).to_string() - } else { - key.to_string() - } - }) - .collect::>() + get_transform_output_ids( + transform.inner.as_ref(), + key.clone(), + config.schema.log_namespace(), + ) + .map(|output| output.to_string()) + .collect::>() }); let mut loose_end_outputs = Vec::new(); diff --git a/src/config/validation.rs b/src/config/validation.rs index 2c971f602fed3..07ba4639dc71f 100644 --- a/src/config/validation.rs +++ b/src/config/validation.rs @@ -1,11 +1,14 @@ -use crate::{config::schema, topology::schema::possible_definitions}; +use crate::config::schema; use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use heim::{disk::Partition, units::information::byte}; use indexmap::IndexMap; use std::{collections::HashMap, path::PathBuf}; use vector_core::internal_event::DEFAULT_OUTPUT; -use super::{builder::ConfigBuilder, ComponentKey, Config, OutputId, Resource}; +use super::{ + builder::ConfigBuilder, transform::get_transform_output_ids, ComponentKey, Config, OutputId, + Resource, +}; /// Check that provide + topology config aren't present in the same builder, which is an error. pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec> { @@ -169,15 +172,12 @@ pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec> { errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}"))); } - if transform - .inner - .outputs( - &[(OutputId::dummy(), definition)], - config.schema.log_namespace(), - ) - .iter() - .map(|output| output.port.as_deref().unwrap_or("")) - .any(|name| name == DEFAULT_OUTPUT) + if get_transform_output_ids( + transform.inner.as_ref(), + key.clone(), + config.schema.log_namespace(), + ) + .any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT)) { errors.push(format!( "Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`" @@ -325,7 +325,6 @@ async fn process_partitions(partitions: Vec) -> heim::Result Vec { let mut warnings = vec![]; - let mut cache = HashMap::new(); let source_ids = config.sources.iter().flat_map(|(key, source)| { source @@ -342,21 +341,13 @@ pub fn warnings(config: &Config) -> Vec { .collect::>() }); let transform_ids = config.transforms.iter().flat_map(|(key, transform)| { - transform - .inner - .outputs( - &possible_definitions(&transform.inputs, config, &mut cache), - config.schema.log_namespace(), - ) - .iter() - .map(|output| { - if let Some(port) = &output.port { - ("transform", OutputId::from((key, port.clone()))) - } else { - ("transform", OutputId::from(key)) - } - }) - .collect::>() + get_transform_output_ids( + transform.inner.as_ref(), + key.clone(), + config.schema.log_namespace(), + ) + .map(|output| ("transform", output)) + .collect::>() }); for (input_type, id) in transform_ids.chain(source_ids) { diff --git a/src/test_util/mock/mod.rs b/src/test_util/mock/mod.rs index 40c430e4e70a4..62b0d96d76f10 100644 --- a/src/test_util/mock/mod.rs +++ b/src/test_util/mock/mod.rs @@ -16,7 +16,7 @@ use self::{ BackpressureSourceConfig, BasicSourceConfig, ErrorSourceConfig, PanicSourceConfig, TripwireSourceConfig, }, - transforms::BasicTransformConfig, + transforms::{BasicTransformConfig, ErrorDefinitionTransformConfig}, }; pub mod sinks; @@ -66,6 +66,10 @@ pub fn basic_transform(suffix: &str, increase: f64) -> BasicTransformConfig { BasicTransformConfig::new(suffix.to_owned(), increase) } +pub const fn error_definition_transform() -> ErrorDefinitionTransformConfig { + ErrorDefinitionTransformConfig {} +} + pub const fn backpressure_sink(num_to_consume: usize) -> BackpressureSinkConfig { BackpressureSinkConfig { num_to_consume } } diff --git a/src/test_util/mock/transforms/basic.rs b/src/test_util/mock/transforms/basic.rs index 90e4484a547bd..f1de0483da2a5 100644 --- a/src/test_util/mock/transforms/basic.rs +++ b/src/test_util/mock/transforms/basic.rs @@ -51,6 +51,7 @@ impl TransformConfig for BasicTransformConfig { fn outputs( &self, + _: enrichment::TableRegistry, definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/test_util/mock/transforms/error_definitions.rs b/src/test_util/mock/transforms/error_definitions.rs new file mode 100644 index 0000000000000..3d13402957dda --- /dev/null +++ b/src/test_util/mock/transforms/error_definitions.rs @@ -0,0 +1,62 @@ +use async_trait::async_trait; +use snafu::Snafu; +use value::Kind; +use vector_config::configurable_component; +use vector_core::{ + config::{DataType, Input, LogNamespace, TransformOutput}, + schema::Definition, + transform::Transform, +}; + +use crate::config::{OutputId, TransformConfig, TransformContext}; + +#[derive(Debug, Snafu)] +enum Error { + #[snafu(display("It all went horribly wrong"))] + ItAllWentHorriblyWrong, +} + +/// Configuration for the `test_error_definition` transform. +#[configurable_component(transform("test_error_definition", "Test (error definition)"))] +#[derive(Clone, Debug, Default)] +pub struct ErrorDefinitionTransformConfig {} + +impl_generate_config_from_default!(ErrorDefinitionTransformConfig); + +#[async_trait] +#[typetag::serde(name = "test_error_definition")] +impl TransformConfig for ErrorDefinitionTransformConfig { + fn input(&self) -> Input { + Input::all() + } + + fn outputs( + &self, + _: enrichment::TableRegistry, + definitions: &[(OutputId, Definition)], + _: LogNamespace, + ) -> Vec { + vec![TransformOutput::new( + DataType::all(), + definitions + .iter() + .map(|(output, definition)| { + ( + output.clone(), + // Return a definition of Kind::never implying that we can never return a value. + Definition::new_with_default_metadata( + Kind::never(), + definition.log_namespaces().clone(), + ), + ) + }) + .collect(), + )] + } + + async fn build(&self, _: &TransformContext) -> crate::Result { + // Even though the definitions returned were `Kind::never`, build needs to be + // called in order to return the Error. + Err(Error::ItAllWentHorriblyWrong.into()) + } +} diff --git a/src/test_util/mock/transforms/mod.rs b/src/test_util/mock/transforms/mod.rs index 2b6af18c35219..57f558aa0f2e4 100644 --- a/src/test_util/mock/transforms/mod.rs +++ b/src/test_util/mock/transforms/mod.rs @@ -6,6 +6,9 @@ pub use self::basic::BasicTransformConfig; mod noop; pub use self::noop::NoopTransformConfig; +mod error_definitions; +pub use self::error_definitions::ErrorDefinitionTransformConfig; + /// Transform types. #[configurable_component] #[derive(Clone, Debug)] diff --git a/src/test_util/mock/transforms/noop.rs b/src/test_util/mock/transforms/noop.rs index 18aadec304d03..b1d23002734e9 100644 --- a/src/test_util/mock/transforms/noop.rs +++ b/src/test_util/mock/transforms/noop.rs @@ -41,6 +41,7 @@ impl TransformConfig for NoopTransformConfig { fn outputs( &self, + _: enrichment::TableRegistry, definitions: &[(OutputId, Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/topology/builder.rs b/src/topology/builder.rs index ab7f2330ddf5a..ca2d51beabe5d 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -118,7 +118,7 @@ impl<'a> Builder<'a> { let enrichment_tables = self.load_enrichment_tables().await; let source_tasks = self.build_sources().await; self.build_transforms(enrichment_tables).await; - self.build_sinks().await; + self.build_sinks(enrichment_tables).await; // We should have all the data for the enrichment tables loaded now, so switch them over to // readonly. @@ -402,8 +402,20 @@ impl<'a> Builder<'a> { { debug!(component = %key, "Building new transform."); - let input_definitions = - schema::input_definitions(&transform.inputs, self.config, &mut definition_cache); + let input_definitions = match schema::input_definitions( + &transform.inputs, + self.config, + enrichment_tables.clone(), + &mut definition_cache, + ) { + Ok(definitions) => definitions, + Err(_) => { + // We have received an error whilst retrieving the definitions, + // there is no point in continuing. + + return; + } + }; let merged_definition: Definition = input_definitions .iter() @@ -424,9 +436,16 @@ impl<'a> Builder<'a> { // Create a map of the outputs to the list of possible definitions from those outputs. let schema_definitions = transform .inner - .outputs(&input_definitions, self.config.schema.log_namespace()) + .outputs( + enrichment_tables.clone(), + &input_definitions, + self.config.schema.log_namespace(), + ) .into_iter() - .map(|output| (output.port, output.log_schema_definitions)) + .map(|output| { + let definitions = output.schema_definitions(self.config.schema.enabled); + (output.port, definitions) + }) .collect::>(); let context = TransformContext { @@ -440,6 +459,7 @@ impl<'a> Builder<'a> { let node = TransformNode::from_parts( key.clone(), + enrichment_tables.clone(), transform, &input_definitions, self.config.schema.log_namespace(), @@ -475,7 +495,7 @@ impl<'a> Builder<'a> { } } - async fn build_sinks(&mut self) { + async fn build_sinks(&mut self, enrichment_tables: &enrichment::TableRegistry) { for (key, sink) in self .config .sinks() @@ -493,7 +513,12 @@ impl<'a> Builder<'a> { // At this point, we've validated that all transforms are valid, including any // transform that mutates the schema provided by their sources. We can now validate the // schema expectations of each individual sink. - if let Err(mut err) = schema::validate_sink_expectations(key, sink, self.config) { + if let Err(mut err) = schema::validate_sink_expectations( + key, + sink, + self.config, + enrichment_tables.clone(), + ) { self.errors.append(&mut err); }; @@ -670,6 +695,7 @@ struct TransformNode { impl TransformNode { pub fn from_parts( key: ComponentKey, + enrichment_tables: enrichment::TableRegistry, transform: &TransformOuter, schema_definition: &[(OutputId, Definition)], global_log_namespace: LogNamespace, @@ -679,9 +705,11 @@ impl TransformNode { typetag: transform.inner.get_component_name(), inputs: transform.inputs.clone(), input_details: transform.inner.input(), - outputs: transform - .inner - .outputs(schema_definition, global_log_namespace), + outputs: transform.inner.outputs( + enrichment_tables, + schema_definition, + global_log_namespace, + ), enable_concurrency: transform.inner.enable_concurrency(), } } diff --git a/src/topology/schema.rs b/src/topology/schema.rs index 8e9fcd51f8704..9d00ff11544cc 100644 --- a/src/topology/schema.rs +++ b/src/topology/schema.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use snafu::Snafu; use vector_core::config::SourceOutput; pub(super) use crate::schema::Definition; @@ -9,6 +10,11 @@ use crate::{ topology, }; +#[derive(Debug, Snafu)] +pub enum Error { + ContainsNever, +} + /// The cache is used whilst building up the topology. /// TODO: Describe more, especially why we have a bool in the key. type Cache = HashMap<(bool, Vec), Vec<(OutputId, Definition)>>; @@ -16,15 +22,16 @@ type Cache = HashMap<(bool, Vec), Vec<(OutputId, Definition)>>; pub fn possible_definitions( inputs: &[OutputId], config: &dyn ComponentContainer, + enrichment_tables: enrichment::TableRegistry, cache: &mut Cache, -) -> Vec<(OutputId, Definition)> { +) -> Result, Error> { if inputs.is_empty() { - return vec![]; + return Ok(vec![]); } // Try to get the definition from the cache. if let Some(definition) = cache.get(&(config.schema_enabled(), inputs.to_vec())) { - return definition.clone(); + return Ok(definition.clone()); } let mut definitions = Vec::new(); @@ -45,16 +52,26 @@ pub fn possible_definitions( .schema_definition(config.schema_enabled()), ); + if contains_never(&source_definition) { + return Err(Error::ContainsNever); + } + definitions.append(&mut source_definition); } // If the input is a transform, the output is merged into the top-level schema if let Some(inputs) = config.transform_inputs(key) { - let input_definitions = possible_definitions(inputs, config, cache); + let input_definitions = + possible_definitions(inputs, config, enrichment_tables.clone(), cache)?; let mut transform_definition = input.with_definitions( config - .transform_output_for_port(key, &input.port, &input_definitions) + .transform_output_for_port( + key, + &input.port, + enrichment_tables.clone(), + &input_definitions, + ) .expect("transform must exist - already found inputs") .unwrap_or_else(|| { unreachable!( @@ -62,16 +79,20 @@ pub fn possible_definitions( &input.port ) }) - .log_schema_definitions + .schema_definitions(config.schema_enabled()) .values() .cloned(), ); + if contains_never(&transform_definition) { + return Err(Error::ContainsNever); + } + definitions.append(&mut transform_definition); } } - definitions + Ok(definitions) } /// Get a list of definitions from individual pipelines feeding into a component. @@ -88,13 +109,14 @@ pub fn possible_definitions( /// 5` being expanded into two individual routes (So1 -> T3 -> T5 -> Si1 AND So1 -> T4 -> T5 -> /// Si1). pub(super) fn expanded_definitions( + enrichment_tables: enrichment::TableRegistry, inputs: &[OutputId], config: &dyn ComponentContainer, cache: &mut Cache, -) -> Vec<(OutputId, Definition)> { +) -> Result, Error> { // Try to get the definition from the cache. if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) { - return definitions.clone(); + return Ok(definitions.clone()); } let mut definitions: Vec<(OutputId, Definition)> = vec![]; @@ -130,21 +152,31 @@ pub(super) fn expanded_definitions( unreachable!("source output mis-configured") }); + if contains_never(&source_definitions) { + return Err(Error::ContainsNever); + } + definitions.append(&mut source_definitions); // A transform can receive from multiple inputs, and each input needs to be expanded to // a new pipeline. } else if let Some(inputs) = config.transform_inputs(key) { - let input_definitions = possible_definitions(inputs, config, &mut merged_cache); + let input_definitions = + possible_definitions(inputs, config, enrichment_tables.clone(), &mut merged_cache)?; let mut transform_definition = config - .transform_outputs(key, &input_definitions) + .transform_outputs(key, enrichment_tables.clone(), &input_definitions) .expect("already found inputs") .iter() .find_map(|output| { if output.port == input.port { Some( - input.with_definitions(output.log_schema_definitions.values().cloned()), + input.with_definitions( + output + .schema_definitions(config.schema_enabled()) + .values() + .cloned(), + ), ) } else { None @@ -154,6 +186,10 @@ pub(super) fn expanded_definitions( // error, but other parts of the topology builder deal with this state. .expect("transform output misconfigured"); + if contains_never(&transform_definition) { + return Err(Error::ContainsNever); + } + // Append whatever number of additional pipelines we created to the existing // pipeline definitions. definitions.append(&mut transform_definition); @@ -165,21 +201,24 @@ pub(super) fn expanded_definitions( definitions.clone(), ); - definitions + Ok(definitions) } /// Returns a list of definitions from the given inputs. +/// Errors if any of the definitions are [`Kind::never`] implying that +/// an error condition has been reached. pub(crate) fn input_definitions( inputs: &[OutputId], config: &Config, + enrichment_tables: enrichment::TableRegistry, cache: &mut Cache, -) -> Vec<(OutputId, Definition)> { +) -> Result, Error> { if inputs.is_empty() { - return vec![]; + return Ok(vec![]); } if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) { - return definitions.clone(); + return Ok(definitions.clone()); } let mut definitions = Vec::new(); @@ -201,16 +240,31 @@ pub(crate) fn input_definitions( .schema_definition(config.schema_enabled()), ); + if contains_never(&source_definitions) { + return Err(Error::ContainsNever); + } + definitions.append(&mut source_definitions); } // If the input is a transform we recurse to the upstream components to retrieve // their definitions and pass it through the transform to get the new definitions. if let Some(inputs) = config.transform_inputs(key) { - let transform_definitions = input_definitions(inputs, config, cache); + let transform_definitions = + input_definitions(inputs, config, enrichment_tables.clone(), cache)?; + + if contains_never(&transform_definitions) { + return Err(Error::ContainsNever); + } + let mut transform_definitions = input.with_definitions( config - .transform_output_for_port(key, &input.port, &transform_definitions) + .transform_output_for_port( + key, + &input.port, + enrichment_tables.clone(), + &transform_definitions, + ) .expect("transform must exist") .unwrap_or_else(|| { unreachable!( @@ -218,22 +272,36 @@ pub(crate) fn input_definitions( &input.port ) }) - .log_schema_definitions + .schema_definitions(config.schema_enabled()) .values() .cloned(), ); + if contains_never(&transform_definitions) { + return Err(Error::ContainsNever); + } + definitions.append(&mut transform_definitions); } } - definitions + Ok(definitions) +} + +/// Checks if any of the definitions in the list contain `Kind::never()`. This +/// implies the definition cannot contain any output and thus we should stop +/// processing further. +fn contains_never(transform_definitions: &[(OutputId, Definition)]) -> bool { + transform_definitions + .iter() + .any(|(_, definition)| definition.event_kind().is_never()) } pub(super) fn validate_sink_expectations( key: &ComponentKey, sink: &SinkOuter, config: &topology::Config, + enrichment_tables: enrichment::TableRegistry, ) -> Result<(), Vec> { let mut errors = vec![]; @@ -244,7 +312,14 @@ pub(super) fn validate_sink_expectations( // Get all pipeline definitions feeding into this sink. let mut cache = HashMap::default(); - let definitions = expanded_definitions(&sink.inputs, config, &mut cache); + let definitions = + match expanded_definitions(enrichment_tables, &sink.inputs, config, &mut cache) { + Ok(definitions) => definitions, + Err(err) => { + errors.push(err.to_string()); + return Err(errors); + } + }; // Validate each individual definition against the sink requirement. for (_output, definition) in definitions { @@ -277,6 +352,7 @@ pub trait ComponentContainer { fn transform_outputs( &self, key: &ComponentKey, + enrichment_tables: enrichment::TableRegistry, input_definitions: &[(OutputId, Definition)], ) -> Option>; @@ -289,9 +365,10 @@ pub trait ComponentContainer { &self, key: &ComponentKey, port: &Option, + enrichment_tables: enrichment::TableRegistry, input_definitions: &[(OutputId, Definition)], ) -> Result, ()> { - if let Some(outputs) = self.transform_outputs(key, input_definitions) { + if let Some(outputs) = self.transform_outputs(key, enrichment_tables, input_definitions) { Ok(get_output_for_port(outputs, port)) } else { Err(()) @@ -347,12 +424,15 @@ impl ComponentContainer for Config { fn transform_outputs( &self, key: &ComponentKey, + enrichment_tables: enrichment::TableRegistry, input_definitions: &[(OutputId, Definition)], ) -> Option> { self.transform(key).map(|source| { - source - .inner - .outputs(input_definitions, self.schema.log_namespace()) + source.inner.outputs( + enrichment_tables, + input_definitions, + self.schema.log_namespace(), + ) }) } } @@ -394,7 +474,8 @@ mod tests { fn transform_outputs( &self, key: &ComponentKey, - _input_definitions: &[(OutputId, Definition)], + _: enrichment::TableRegistry, + _: &[(OutputId, Definition)], ) -> Option> { self.transforms.get(key.id()).cloned().map(|v| v.1) } @@ -736,7 +817,13 @@ mod tests { }) .collect::>(); - let got = expanded_definitions(&inputs, &case, &mut HashMap::default()); + let got = expanded_definitions( + enrichment::TableRegistry::default(), + &inputs, + &case, + &mut HashMap::default(), + ) + .unwrap(); assert_eq!(got, case.want, "{}", title); } } diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index a7ebd52f7731d..21830369642eb 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -14,10 +14,11 @@ use crate::{ mock::{ basic_sink, basic_sink_failing_healthcheck, basic_sink_with_data, basic_source, basic_source_with_data, basic_source_with_event_counter, basic_transform, + error_definition_transform, }, start_topology, trace_init, }, - topology, + topology::{self, builder}, }; use futures::{future, stream, StreamExt}; use tokio::{ @@ -818,3 +819,26 @@ async fn topology_disk_buffer_flushes_on_idle() { let rest = out1.collect::>().await; assert_eq!(rest, vec![]); } + +#[tokio::test] +async fn topology_transform_error_definition() { + trace_init(); + + let mut config = Config::builder(); + + config.add_source("in", basic_source().1); + config.add_transform("transform", &["in"], error_definition_transform()); + config.add_sink("sink", &["transform"], basic_sink(10).1); + + let config = config.build().unwrap(); + let diff = ConfigDiff::initial(&config); + let errors = match builder::build_pieces(&config, &diff, HashMap::new()).await { + Ok(_) => panic!("build pieces should not succeed"), + Err(err) => err, + }; + + assert_eq!( + r#"Transform "transform": It all went horribly wrong"#, + errors[0] + ); +} diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index 95212582601e0..798a90ad404a7 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -48,6 +48,7 @@ impl TransformConfig for AggregateConfig { fn outputs( &self, + _: enrichment::TableRegistry, _: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/transforms/aws_ec2_metadata.rs b/src/transforms/aws_ec2_metadata.rs index 9a66140de50fe..68c81b6b30bad 100644 --- a/src/transforms/aws_ec2_metadata.rs +++ b/src/transforms/aws_ec2_metadata.rs @@ -246,6 +246,7 @@ impl TransformConfig for Ec2Metadata { fn outputs( &self, + _: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index c62f40b31a339..658754c10bc68 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -155,6 +155,7 @@ impl TransformConfig for DedupeConfig { fn outputs( &self, + _: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/transforms/filter.rs b/src/transforms/filter.rs index 9351c1d3c724b..97f580ba0e671 100644 --- a/src/transforms/filter.rs +++ b/src/transforms/filter.rs @@ -53,6 +53,7 @@ impl TransformConfig for FilterConfig { fn outputs( &self, + _enrichment_tables: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index 59c0eea625da8..3ad4df3f0f1ab 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -159,6 +159,7 @@ impl TransformConfig for LogToMetricConfig { fn outputs( &self, + _: enrichment::TableRegistry, _: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/transforms/lua/mod.rs b/src/transforms/lua/mod.rs index ab43bc911b948..febcf58928ff5 100644 --- a/src/transforms/lua/mod.rs +++ b/src/transforms/lua/mod.rs @@ -105,6 +105,7 @@ impl TransformConfig for LuaConfig { fn outputs( &self, + _: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/transforms/metric_to_log.rs b/src/transforms/metric_to_log.rs index e744ab02b94c1..959d6e6f31140 100644 --- a/src/transforms/metric_to_log.rs +++ b/src/transforms/metric_to_log.rs @@ -94,6 +94,7 @@ impl TransformConfig for MetricToLogConfig { fn outputs( &self, + _: enrichment::TableRegistry, input_definitions: &[(OutputId, Definition)], global_log_namespace: LogNamespace, ) -> Vec { diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index 1e98086a52053..40d8964a9ee27 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -127,6 +127,7 @@ impl TransformConfig for ReduceConfig { fn outputs( &self, + _: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { @@ -515,10 +516,14 @@ group_by = [ "request_id" ] None, ); let schema_definitions = reduce_config - .outputs(&[("test".into(), input_definition)], LogNamespace::Legacy) + .outputs( + enrichment::TableRegistry::default(), + &[("test".into(), input_definition)], + LogNamespace::Legacy, + ) .first() .unwrap() - .log_schema_definitions + .schema_definitions(true) .clone(); let (tx, rx) = mpsc::channel(1); diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index 7a2efa07b7f6a..1872f4e49d9db 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -11,6 +11,8 @@ use codecs::MetricTagValues; use lookup::lookup_v2::{parse_value_path, ValuePath}; use lookup::{metadata_path, owned_value_path, path, OwnedTargetPath, PathPrefix}; use snafu::{ResultExt, Snafu}; +use value::kind::merge::{CollisionStrategy, Strategy}; +use value::kind::Collection; use value::Kind; use vector_common::TimeZone; use vector_config::configurable_component; @@ -226,6 +228,7 @@ impl TransformConfig for RemapConfig { fn outputs( &self, + enrichment_tables: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { @@ -239,7 +242,7 @@ impl TransformConfig for RemapConfig { // transform. We ignore any compilation errors, as those are caught by the transform build // step. let compiled = self - .compile_vrl_program(enrichment::TableRegistry::default(), merged_definition) + .compile_vrl_program(enrichment_tables, merged_definition) .map(|(program, _, _, external_context)| { ( program.final_type_state(), @@ -339,8 +342,14 @@ impl TransformConfig for RemapConfig { ); } - default_definitions.insert(output_id.clone(), default_definition); - dropped_definitions.insert(output_id.clone(), dropped_definition); + default_definitions.insert( + output_id.clone(), + move_field_definitions_into_message(merge_array_definitions(default_definition)), + ); + dropped_definitions.insert( + output_id.clone(), + move_field_definitions_into_message(merge_array_definitions(dropped_definition)), + ); } let default_output = TransformOutput::new(DataType::all(), default_definitions); @@ -650,6 +659,58 @@ fn push_dropped( output.push_named(DROPPED, event) } +/// If the VRL returns a value that is not an array (see [`merge_array_definitions`]), +/// or an object, that data is moved into the `message` field. +fn move_field_definitions_into_message(mut definition: schema::Definition) -> schema::Definition { + let mut message = definition.event_kind().clone(); + message.remove_object(); + message.remove_array(); + + if !message.is_never() { + // We need to add the given message type to a field called `message` + // in the event. + let message = Kind::object(Collection::from(BTreeMap::from([( + log_schema().message_key().into(), + message, + )]))); + + definition.event_kind_mut().remove_bytes(); + definition.event_kind_mut().remove_integer(); + definition.event_kind_mut().remove_float(); + definition.event_kind_mut().remove_boolean(); + definition.event_kind_mut().remove_timestamp(); + definition.event_kind_mut().remove_regex(); + definition.event_kind_mut().remove_null(); + + *definition.event_kind_mut() = definition.event_kind().union(message); + } + + definition +} + +/// If the transform returns an array, the elements of this array will be separated +/// out into it's individual elements and passed downstream. +/// +/// The potential types that the transform can output are any of the arrays +/// elements or any non-array elements that are within the definition. All these +/// definitions need to be merged together. +fn merge_array_definitions(mut definition: schema::Definition) -> schema::Definition { + if let Some(array) = definition.event_kind().as_array() { + let array_kinds = array.reduced_kind(); + + let kind = definition.event_kind_mut(); + kind.remove_array(); + kind.merge( + array_kinds, + Strategy { + collisions: CollisionStrategy::Union, + }, + ); + } + + definition +} + #[derive(Debug, Snafu)] pub enum BuildError { #[snafu(display("must provide exactly one of `source` or `file` configuration"))] @@ -663,10 +724,13 @@ pub enum BuildError { #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use indoc::{formatdoc, indoc}; - use value::btreemap; + use value::{ + btreemap, + kind::{Collection, Index}, + }; use vector_core::{config::GlobalOptions, event::EventMetadata, metric_tags}; use super::*; @@ -1446,6 +1510,7 @@ mod tests { assert_eq!( conf.outputs( + enrichment::TableRegistry::default(), &[( "test".into(), schema::Definition::new_with_default_metadata( @@ -1591,4 +1656,423 @@ mod tests { }) .await } + + #[test] + fn test_field_definitions_in_message() { + let definition = + schema::Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Legacy]); + assert_eq!( + schema::Definition::new_with_default_metadata( + Kind::object(BTreeMap::from([("message".into(), Kind::bytes())])), + [LogNamespace::Legacy] + ), + move_field_definitions_into_message(definition) + ); + + // Test when a message field already exists. + let definition = schema::Definition::new_with_default_metadata( + Kind::object(BTreeMap::from([("message".into(), Kind::integer())])).or_bytes(), + [LogNamespace::Legacy], + ); + assert_eq!( + schema::Definition::new_with_default_metadata( + Kind::object(BTreeMap::from([( + "message".into(), + Kind::bytes().or_integer() + )])), + [LogNamespace::Legacy] + ), + move_field_definitions_into_message(definition) + ) + } + + #[test] + fn test_merged_array_definitions_simple() { + // Test merging the array definitions where the schema definition + // is simple, containing only one possible type in the array. + let object: BTreeMap = [ + ("carrot".into(), Kind::bytes()), + ("potato".into(), Kind::integer()), + ] + .into(); + + let kind = Kind::array(Collection::from_unknown(Kind::object(object))); + + let definition = + schema::Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); + + let kind = Kind::object(BTreeMap::from([ + ("carrot".into(), Kind::bytes()), + ("potato".into(), Kind::integer()), + ])); + + let wanted = schema::Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); + let merged = merge_array_definitions(definition); + + assert_eq!(wanted, merged); + } + + #[test] + fn test_merged_array_definitions_complex() { + // Test merging the array definitions where the schema definition + // is fairly complex containing multiple different possible types. + let object: BTreeMap = [ + ("carrot".into(), Kind::bytes()), + ("potato".into(), Kind::integer()), + ] + .into(); + + let array: BTreeMap = [ + (Index::from(0), Kind::integer()), + (Index::from(1), Kind::boolean()), + ( + Index::from(2), + Kind::object(BTreeMap::from([("peas".into(), Kind::bytes())])), + ), + ] + .into(); + + let mut kind = Kind::bytes(); + kind.add_object(object); + kind.add_array(array); + + let definition = + schema::Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); + + let mut kind = Kind::bytes(); + kind.add_integer(); + kind.add_boolean(); + kind.add_object(BTreeMap::from([ + ("carrot".into(), Kind::bytes().or_undefined()), + ("potato".into(), Kind::integer().or_undefined()), + ("peas".into(), Kind::bytes().or_undefined()), + ])); + + let wanted = schema::Definition::new_with_default_metadata(kind, [LogNamespace::Legacy]); + let merged = merge_array_definitions(definition); + + assert_eq!(wanted, merged); + } + + #[test] + fn test_combined_transforms_simple() { + // Make sure that when getting the definitions from one transform and + // passing them to another the correct definition is still produced. + + // Transform 1 sets a simple value. + let transform1 = RemapConfig { + source: Some(r#".thing = "potato""#.to_string()), + ..Default::default() + }; + + let transform2 = RemapConfig { + source: Some(".thang = .thing".to_string()), + ..Default::default() + }; + + let enrichment_tables = enrichment::TableRegistry::default(); + + let outputs1 = transform1.outputs( + enrichment_tables.clone(), + &[("in".into(), schema::Definition::default_legacy_namespace())], + LogNamespace::Legacy, + ); + + assert_eq!( + vec![TransformOutput::new( + DataType::all(), + // The `never` definition should have been passed on to the end. + [( + "in".into(), + Definition::default_legacy_namespace().with_event_field( + &owned_value_path!("thing"), + Kind::bytes(), + None + ), + )] + .into() + )], + outputs1 + ); + + let outputs2 = transform2.outputs( + enrichment_tables, + &[( + "in1".into(), + outputs1[0].schema_definitions(true)[&"in".into()].clone(), + )], + LogNamespace::Legacy, + ); + + assert_eq!( + vec![TransformOutput::new( + DataType::all(), + [( + "in1".into(), + Definition::default_legacy_namespace() + .with_event_field(&owned_value_path!("thing"), Kind::bytes(), None) + .with_event_field(&owned_value_path!("thang"), Kind::bytes(), None), + )] + .into(), + )], + outputs2 + ); + } + + #[test] + fn test_combined_transforms_unnest() { + // Make sure that when getting the definitions from one transform and + // passing them to another the correct definition is still produced. + + // Transform 1 sets a simple value. + let transform1 = RemapConfig { + source: Some( + indoc! { + r#" + .thing = [{"cabbage": 32}, {"parsnips": 45}] + . = unnest(.thing) + "# + } + .to_string(), + ), + ..Default::default() + }; + + let transform2 = RemapConfig { + source: Some(r#".thang = .thing.cabbage || "beetroot""#.to_string()), + ..Default::default() + }; + + let enrichment_tables = enrichment::TableRegistry::default(); + + let outputs1 = transform1.outputs( + enrichment_tables.clone(), + &[( + "in".into(), + schema::Definition::new_with_default_metadata( + Kind::any_object(), + [LogNamespace::Legacy], + ), + )], + LogNamespace::Legacy, + ); + + assert_eq!( + vec![TransformOutput::new( + DataType::all(), + [( + "in".into(), + Definition::new_with_default_metadata( + Kind::any_object(), + [LogNamespace::Legacy] + ) + .with_event_field( + &owned_value_path!("thing"), + Kind::object(Collection::from(BTreeMap::from([ + ("cabbage".into(), Kind::integer().or_undefined(),), + ("parsnips".into(), Kind::integer().or_undefined(),) + ]))), + None + ), + )] + .into(), + )], + outputs1 + ); + + let outputs2 = transform2.outputs( + enrichment_tables, + &[( + "in1".into(), + outputs1[0].schema_definitions(true)[&"in".into()].clone(), + )], + LogNamespace::Legacy, + ); + + assert_eq!( + vec![TransformOutput::new( + DataType::all(), + [( + "in1".into(), + Definition::default_legacy_namespace() + .with_event_field( + &owned_value_path!("thing"), + Kind::object(Collection::from(BTreeMap::from([ + ("cabbage".into(), Kind::integer().or_undefined(),), + ("parsnips".into(), Kind::integer().or_undefined(),) + ]))), + None + ) + .with_event_field( + &owned_value_path!("thang"), + Kind::integer().or_null(), + None + ), + )] + .into(), + )], + outputs2 + ); + } + + #[test] + fn test_transform_abort() { + // An abort should not change the typedef. + + let transform1 = RemapConfig { + source: Some(r#"abort"#.to_string()), + ..Default::default() + }; + + let enrichment_tables = enrichment::TableRegistry::default(); + + let outputs1 = transform1.outputs( + enrichment_tables, + &[( + "in".into(), + schema::Definition::new_with_default_metadata( + Kind::any_object(), + [LogNamespace::Legacy], + ), + )], + LogNamespace::Legacy, + ); + + assert_eq!( + vec![TransformOutput::new( + DataType::all(), + [( + "in".into(), + Definition::new_with_default_metadata( + Kind::any_object(), + [LogNamespace::Legacy] + ), + )] + .into(), + )], + outputs1 + ); + } + + #[test] + fn test_error_outputs() { + // Even if we fail to compile the VRL it should still output + // the correct ports. This may change if we separate the + // `outputs` function into one returning outputs and a separate + // returning schema definitions. + let transform1 = RemapConfig { + // This enrichment table does not exist. + source: Some(r#". |= get_enrichment_table_record("carrot", {"id": .id})"#.to_string()), + reroute_dropped: true, + ..Default::default() + }; + + let enrichment_tables = enrichment::TableRegistry::default(); + + let outputs1 = transform1.outputs( + enrichment_tables, + &[( + "in".into(), + schema::Definition::new_with_default_metadata( + Kind::any_object(), + [LogNamespace::Legacy], + ), + )], + LogNamespace::Legacy, + ); + + assert_eq!( + HashSet::from([None, Some("dropped".to_string())]), + outputs1 + .into_iter() + .map(|output| output.port) + .collect::>() + ); + } + + #[test] + fn test_non_object_events() { + let transform1 = RemapConfig { + // This enrichment table does not exist. + source: Some(r#". = "fish" "#.to_string()), + ..Default::default() + }; + + let enrichment_tables = enrichment::TableRegistry::default(); + + let outputs1 = transform1.outputs( + enrichment_tables, + &[( + "in".into(), + schema::Definition::new_with_default_metadata( + Kind::any_object(), + [LogNamespace::Legacy], + ), + )], + LogNamespace::Legacy, + ); + + let wanted = schema::Definition::new_with_default_metadata( + Kind::object(Collection::from_unknown(Kind::undefined())), + [LogNamespace::Legacy], + ) + .with_event_field(&owned_value_path!("message"), Kind::bytes(), None); + + assert_eq!( + HashMap::from([(OutputId::from("in"), wanted)]), + outputs1[0].schema_definitions(true), + ); + } + + #[test] + fn test_array_and_non_object_events() { + let transform1 = RemapConfig { + source: Some( + indoc! {r#" + if .lizard == true { + .thing = [{"cabbage": 42}]; + . = unnest(.thing) + } else { + . = "fish" + } + "#} + .to_string(), + ), + ..Default::default() + }; + + let enrichment_tables = enrichment::TableRegistry::default(); + + let outputs1 = transform1.outputs( + enrichment_tables, + &[( + "in".into(), + schema::Definition::new_with_default_metadata( + Kind::any_object(), + [LogNamespace::Legacy], + ), + )], + LogNamespace::Legacy, + ); + + let wanted = schema::Definition::new_with_default_metadata( + Kind::any_object(), + [LogNamespace::Legacy], + ) + .with_event_field(&owned_value_path!("message"), Kind::any(), None) + .with_event_field( + &owned_value_path!("thing"), + Kind::object(Collection::from(BTreeMap::from([( + "cabbage".into(), + Kind::integer(), + )]))) + .or_undefined(), + None, + ); + + assert_eq!( + HashMap::from([(OutputId::from("in"), wanted)]), + outputs1[0].schema_definitions(true), + ); + } } diff --git a/src/transforms/route.rs b/src/transforms/route.rs index 971d678ffe170..adcac43ff504c 100644 --- a/src/transforms/route.rs +++ b/src/transforms/route.rs @@ -106,6 +106,7 @@ impl TransformConfig for RouteConfig { fn outputs( &self, + _: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/transforms/sample.rs b/src/transforms/sample.rs index eec1a2652c4ac..4238ab6e146d2 100644 --- a/src/transforms/sample.rs +++ b/src/transforms/sample.rs @@ -71,6 +71,7 @@ impl TransformConfig for SampleConfig { fn outputs( &self, + _: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/transforms/tag_cardinality_limit/config.rs b/src/transforms/tag_cardinality_limit/config.rs index 8eca913f8c416..d8fe74ea1bb8d 100644 --- a/src/transforms/tag_cardinality_limit/config.rs +++ b/src/transforms/tag_cardinality_limit/config.rs @@ -114,6 +114,7 @@ impl TransformConfig for TagCardinalityLimitConfig { fn outputs( &self, + _: enrichment::TableRegistry, _: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { diff --git a/src/transforms/throttle.rs b/src/transforms/throttle.rs index 4b97a40410e0b..f1563ad7e5435 100644 --- a/src/transforms/throttle.rs +++ b/src/transforms/throttle.rs @@ -61,6 +61,7 @@ impl TransformConfig for ThrottleConfig { fn outputs( &self, + _: enrichment::TableRegistry, input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec {