Skip to content

Commit

Permalink
fix(schemas): Dont panic with non object field kinds (vectordotdev#17140
Browse files Browse the repository at this point in the history
)

* Dont panic with non object field kinds

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Check remap input definition is never

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Spelling

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Add enrichment tables to the transform outputs

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Handle remap array outputs

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Return the panics

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Revert "Return the panics"

This reverts commit 8ed6528.

* Return the panics

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Added multiple transform tests

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Mild spacing

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Mild spacing

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Add transport_output_ids to wrap calls to output that don't need definitions

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Test even a VRL error results in the correct ports

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Test a mix of array and non array results

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Error if a returned definition contains never

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Remove never check in transform, the framework should handle it

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Feedback from Kyle

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Feedback from Kyle

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Feedback from Spencer

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Spelling

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Feedback from Nathan

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Whitespace

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Only use transform definition fields when schema is enabled

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Fixed syntax error

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Clippy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Fixed test. Non object top level fields need removing

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored Apr 27, 2023
1 parent 8067f84 commit 1e43208
Show file tree
Hide file tree
Showing 29 changed files with 855 additions and 119 deletions.
33 changes: 32 additions & 1 deletion lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub struct TransformOutput {
/// enabled, at least one definition should be output. If the transform
/// has multiple connected sources, it is possible to have multiple output
/// definitions - one for each input.
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
log_schema_definitions: HashMap<OutputId, schema::Definition>,
}

impl TransformOutput {
Expand All @@ -222,6 +222,37 @@ impl TransformOutput {
self.port = Some(name.into());
self
}

/// Return the schema [`schema::Definition`] from this output.
///
/// Takes a `schema_enabled` flag to determine if the full definition including the fields
/// and associated types should be returned, or if a simple definition should be returned.
/// A simple definition is just the default for the namespace. For the Vector namespace the
/// meanings are included.
/// Schema enabled is set in the users configuration.
#[must_use]
pub fn schema_definitions(
&self,
schema_enabled: bool,
) -> HashMap<OutputId, schema::Definition> {
if schema_enabled {
self.log_schema_definitions.clone()
} else {
self.log_schema_definitions
.iter()
.map(|(output, definition)| {
let mut new_definition =
schema::Definition::default_for_namespace(definition.log_namespaces());

if definition.log_namespaces().contains(&LogNamespace::Vector) {
new_definition.add_meanings(definition.meanings());
}

(output.clone(), new_definition)
})
.collect()
}
}
}

/// Simple utility function that can be used by transforms that make no changes to
Expand Down
20 changes: 8 additions & 12 deletions src/api/schema/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt};
use vector_config::NamedComponent;
use vector_core::internal_event::DEFAULT_OUTPUT;

use crate::topology::schema::possible_definitions;
use crate::{
api::schema::{
components::state::component_by_component_key,
filter::{self, filter_items},
relay, sort,
},
config::{ComponentKey, Config},
config::{get_transform_output_ids, ComponentKey, Config},
filter_check,
};

Expand Down Expand Up @@ -254,7 +253,6 @@ impl ComponentsSubscription {

/// Update the 'global' configuration that will be consumed by component queries
pub fn update_config(config: &Config) {
let mut cache = HashMap::new();
let mut new_components = HashMap::new();

// Sources
Expand Down Expand Up @@ -291,15 +289,13 @@ pub fn update_config(config: &Config) {
component_key: component_key.clone(),
component_type: transform.inner.get_component_name().to_string(),
inputs: transform.inputs.clone(),
outputs: transform
.inner
.outputs(
&possible_definitions(&transform.inputs, config, &mut cache),
config.schema.log_namespace(),
)
.into_iter()
.map(|output| output.port.unwrap_or_else(|| DEFAULT_OUTPUT.to_string()))
.collect(),
outputs: get_transform_output_ids(
transform.inner.as_ref(),
"".into(),
config.schema.log_namespace(),
)
.map(|output| output.port.unwrap_or_else(|| DEFAULT_OUTPUT.to_string()))
.collect(),
})),
);
}
Expand Down
14 changes: 3 additions & 11 deletions src/config/compiler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use indexmap::IndexSet;

use super::{
builder::ConfigBuilder, graph::Graph, id::Inputs, schema, validation, Config, OutputId,
builder::ConfigBuilder, graph::Graph, id::Inputs, transform::get_transform_output_ids,
validation, Config, OutputId,
};

pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<String>> {
Expand Down Expand Up @@ -137,16 +138,7 @@ pub(crate) fn expand_globs(config: &mut ConfigBuilder) {
})
})
.chain(config.transforms.iter().flat_map(|(key, t)| {
t.inner
.outputs(
&[(key.into(), schema::Definition::any())],
config.schema.log_namespace(),
)
.into_iter()
.map(|output| OutputId {
component: key.clone(),
port: output.port,
})
get_transform_output_ids(t.inner.as_ref(), key.clone(), config.schema.log_namespace())
}))
.map(|output_id| output_id.to_string())
.collect::<IndexSet<String>>();
Expand Down
1 change: 1 addition & 0 deletions src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl Graph {
Node::Transform {
in_ty: transform.inner.input().data_type(),
outputs: transform.inner.outputs(
enrichment::TableRegistry::default(),
&[(id.into(), schema::Definition::any())],
schema.log_namespace(),
),
Expand Down
4 changes: 3 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ pub use provider::ProviderConfig;
pub use secret::SecretBackend;
pub use sink::{SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
pub use transform::{BoxedTransform, TransformConfig, TransformContext, TransformOuter};
pub use transform::{
get_transform_output_ids, BoxedTransform, TransformConfig, TransformContext, TransformOuter,
};
pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult};
pub use validation::warnings;
pub use vector_core::config::{
Expand Down
21 changes: 21 additions & 0 deletions src/config/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send +
/// of events flowing through the transform.
fn outputs(
&self,
enrichment_tables: enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
global_log_namespace: LogNamespace,
) -> Vec<TransformOutput>;
Expand Down Expand Up @@ -236,3 +237,23 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send +
}

dyn_clone::clone_trait_object!(TransformConfig);

/// Often we want to call outputs just to retrieve the OutputId's without needing
/// the schema definitions.
pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
transform: &T,
key: ComponentKey,
global_log_namespace: LogNamespace,
) -> impl Iterator<Item = OutputId> + '_ {
transform
.outputs(
enrichment::TableRegistry::default(),
&[(key.clone().into(), schema::Definition::any())],
global_log_namespace,
)
.into_iter()
.map(move |output| OutputId {
component: key.clone(),
port: output.port,
})
}
34 changes: 13 additions & 21 deletions src/config/unit_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use self::unit_test_components::{
UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
};
use super::{compiler::expand_globs, graph::Graph, OutputId};
use super::{compiler::expand_globs, graph::Graph, transform::get_transform_output_ids, OutputId};
use crate::{
conditions::Condition,
config::{
Expand Down Expand Up @@ -186,14 +186,11 @@ impl UnitTestBuildMetadata {
.transforms
.iter()
.flat_map(|(key, transform)| {
transform
.inner
.outputs(&[], builder.schema.log_namespace())
.into_iter()
.map(|output| OutputId {
component: key.clone(),
port: output.port,
})
get_transform_output_ids(
transform.inner.as_ref(),
key.clone(),
builder.schema.log_namespace(),
)
})
.collect::<HashSet<_>>();

Expand Down Expand Up @@ -457,18 +454,13 @@ async fn build_unit_test(
fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
let config = config.clone();
let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
transform
.inner
.outputs(&[], config.schema.log_namespace())
.iter()
.map(|output| {
if let Some(port) = &output.port {
OutputId::from((key, port.clone())).to_string()
} else {
key.to_string()
}
})
.collect::<Vec<_>>()
get_transform_output_ids(
transform.inner.as_ref(),
key.clone(),
config.schema.log_namespace(),
)
.map(|output| output.to_string())
.collect::<Vec<_>>()
});

let mut loose_end_outputs = Vec::new();
Expand Down
45 changes: 18 additions & 27 deletions src/config/validation.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crate::{config::schema, topology::schema::possible_definitions};
use crate::config::schema;
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use heim::{disk::Partition, units::information::byte};
use indexmap::IndexMap;
use std::{collections::HashMap, path::PathBuf};
use vector_core::internal_event::DEFAULT_OUTPUT;

use super::{builder::ConfigBuilder, ComponentKey, Config, OutputId, Resource};
use super::{
builder::ConfigBuilder, transform::get_transform_output_ids, ComponentKey, Config, OutputId,
Resource,
};

/// Check that provide + topology config aren't present in the same builder, which is an error.
pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
Expand Down Expand Up @@ -169,15 +172,12 @@ pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
}

if transform
.inner
.outputs(
&[(OutputId::dummy(), definition)],
config.schema.log_namespace(),
)
.iter()
.map(|output| output.port.as_deref().unwrap_or(""))
.any(|name| name == DEFAULT_OUTPUT)
if get_transform_output_ids(
transform.inner.as_ref(),
key.clone(),
config.schema.log_namespace(),
)
.any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
{
errors.push(format!(
"Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
Expand Down Expand Up @@ -325,7 +325,6 @@ async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap

pub fn warnings(config: &Config) -> Vec<String> {
let mut warnings = vec![];
let mut cache = HashMap::new();

let source_ids = config.sources.iter().flat_map(|(key, source)| {
source
Expand All @@ -342,21 +341,13 @@ pub fn warnings(config: &Config) -> Vec<String> {
.collect::<Vec<_>>()
});
let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
transform
.inner
.outputs(
&possible_definitions(&transform.inputs, config, &mut cache),
config.schema.log_namespace(),
)
.iter()
.map(|output| {
if let Some(port) = &output.port {
("transform", OutputId::from((key, port.clone())))
} else {
("transform", OutputId::from(key))
}
})
.collect::<Vec<_>>()
get_transform_output_ids(
transform.inner.as_ref(),
key.clone(),
config.schema.log_namespace(),
)
.map(|output| ("transform", output))
.collect::<Vec<_>>()
});

for (input_type, id) in transform_ids.chain(source_ids) {
Expand Down
6 changes: 5 additions & 1 deletion src/test_util/mock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use self::{
BackpressureSourceConfig, BasicSourceConfig, ErrorSourceConfig, PanicSourceConfig,
TripwireSourceConfig,
},
transforms::BasicTransformConfig,
transforms::{BasicTransformConfig, ErrorDefinitionTransformConfig},
};

pub mod sinks;
Expand Down Expand Up @@ -66,6 +66,10 @@ pub fn basic_transform(suffix: &str, increase: f64) -> BasicTransformConfig {
BasicTransformConfig::new(suffix.to_owned(), increase)
}

pub const fn error_definition_transform() -> ErrorDefinitionTransformConfig {
ErrorDefinitionTransformConfig {}
}

pub const fn backpressure_sink(num_to_consume: usize) -> BackpressureSinkConfig {
BackpressureSinkConfig { num_to_consume }
}
Expand Down
1 change: 1 addition & 0 deletions src/test_util/mock/transforms/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl TransformConfig for BasicTransformConfig {

fn outputs(
&self,
_: enrichment::TableRegistry,
definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
Expand Down
62 changes: 62 additions & 0 deletions src/test_util/mock/transforms/error_definitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use async_trait::async_trait;
use snafu::Snafu;
use value::Kind;
use vector_config::configurable_component;
use vector_core::{
config::{DataType, Input, LogNamespace, TransformOutput},
schema::Definition,
transform::Transform,
};

use crate::config::{OutputId, TransformConfig, TransformContext};

#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("It all went horribly wrong"))]
ItAllWentHorriblyWrong,
}

/// Configuration for the `test_error_definition` transform.
#[configurable_component(transform("test_error_definition", "Test (error definition)"))]
#[derive(Clone, Debug, Default)]
pub struct ErrorDefinitionTransformConfig {}

impl_generate_config_from_default!(ErrorDefinitionTransformConfig);

#[async_trait]
#[typetag::serde(name = "test_error_definition")]
impl TransformConfig for ErrorDefinitionTransformConfig {
fn input(&self) -> Input {
Input::all()
}

fn outputs(
&self,
_: enrichment::TableRegistry,
definitions: &[(OutputId, Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
vec![TransformOutput::new(
DataType::all(),
definitions
.iter()
.map(|(output, definition)| {
(
output.clone(),
// Return a definition of Kind::never implying that we can never return a value.
Definition::new_with_default_metadata(
Kind::never(),
definition.log_namespaces().clone(),
),
)
})
.collect(),
)]
}

async fn build(&self, _: &TransformContext) -> crate::Result<Transform> {
// Even though the definitions returned were `Kind::never`, build needs to be
// called in order to return the Error.
Err(Error::ItAllWentHorriblyWrong.into())
}
}
Loading

0 comments on commit 1e43208

Please sign in to comment.