Skip to content

Commit

Permalink
chore(dev): Add wrapper for enrichment to vector-lib (#18977)
Browse files Browse the repository at this point in the history
  • Loading branch information
bruceg authored Oct 30, 2023
1 parent b55e436 commit aadde5f
Show file tree
Hide file tree
Showing 31 changed files with 83 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ pin-project.workspace = true

# Internal libs
dnsmsg-parser = { path = "lib/dnsmsg-parser", optional = true }
enrichment = { path = "lib/enrichment" }
fakedata = { path = "lib/fakedata", optional = true }
file-source = { path = "lib/file-source", optional = true }
lookup = { package = "vector-lookup", path = "lib/vector-lookup" }
Expand Down
1 change: 1 addition & 0 deletions lib/vector-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ publish = false

[dependencies]
codecs = { path = "../codecs", default-features = false }
enrichment = { path = "../enrichment" }
vector-buffers = { path = "../vector-buffers", default-features = false }
vector-common = { path = "../vector-common" }
vector-config = { path = "../vector-config" }
Expand Down
1 change: 1 addition & 0 deletions lib/vector-lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub use codecs;
pub use enrichment;
pub use vector_buffers as buffers;
pub use vector_common::{
assert_event_data_eq, btreemap, byte_size_of, byte_size_of::ByteSizeOf, conversion,
Expand Down
5 changes: 4 additions & 1 deletion src/conditions/datadog_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ impl Conditional for DatadogSearchRunner {
}

impl ConditionalConfig for DatadogSearchConfig {
fn build(&self, _enrichment_tables: &enrichment::TableRegistry) -> crate::Result<Condition> {
fn build(
&self,
_enrichment_tables: &vector_lib::enrichment::TableRegistry,
) -> crate::Result<Condition> {
let node = parse(&self.source)?;
let matcher = as_log(build_matcher(&node, &EventFilter));

Expand Down
15 changes: 12 additions & 3 deletions src/conditions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ pub enum ConditionConfig {
}

impl ConditionConfig {
pub fn build(&self, enrichment_tables: &enrichment::TableRegistry) -> crate::Result<Condition> {
pub fn build(
&self,
enrichment_tables: &vector_lib::enrichment::TableRegistry,
) -> crate::Result<Condition> {
match self {
ConditionConfig::IsLog => Ok(Condition::IsLog),
ConditionConfig::IsMetric => Ok(Condition::IsMetric),
Expand Down Expand Up @@ -148,7 +151,10 @@ pub trait Conditional: std::fmt::Debug {
}

pub trait ConditionalConfig: std::fmt::Debug + Send + Sync + dyn_clone::DynClone {
fn build(&self, enrichment_tables: &enrichment::TableRegistry) -> crate::Result<Condition>;
fn build(
&self,
enrichment_tables: &vector_lib::enrichment::TableRegistry,
) -> crate::Result<Condition>;
}

dyn_clone::clone_trait_object!(ConditionalConfig);
Expand Down Expand Up @@ -184,7 +190,10 @@ pub enum AnyCondition {
}

impl AnyCondition {
pub fn build(&self, enrichment_tables: &enrichment::TableRegistry) -> crate::Result<Condition> {
pub fn build(
&self,
enrichment_tables: &vector_lib::enrichment::TableRegistry,
) -> crate::Result<Condition> {
match self {
AnyCondition::String(s) => {
let vrl_config = VrlConfig {
Expand Down
7 changes: 5 additions & 2 deletions src/conditions/vrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ pub struct VrlConfig {
impl_generate_config_from_default!(VrlConfig);

impl ConditionalConfig for VrlConfig {
fn build(&self, enrichment_tables: &enrichment::TableRegistry) -> crate::Result<Condition> {
fn build(
&self,
enrichment_tables: &vector_lib::enrichment::TableRegistry,
) -> crate::Result<Condition> {
// TODO(jean): re-add this to VRL
// let constraint = TypeConstraint {
// allow_any: false,
Expand All @@ -43,7 +46,7 @@ impl ConditionalConfig for VrlConfig {

let functions = vrl::stdlib::all()
.into_iter()
.chain(enrichment::vrl_functions())
.chain(vector_lib::enrichment::vrl_functions())
.chain(vector_vrl_functions::all())
.collect::<Vec<_>>();

Expand Down
2 changes: 1 addition & 1 deletion src/config/enrichment_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ pub trait EnrichmentTableConfig: NamedComponent + core::fmt::Debug + Send + Sync
async fn build(
&self,
globals: &GlobalOptions,
) -> crate::Result<Box<dyn enrichment::Table + Send + Sync>>;
) -> crate::Result<Box<dyn vector_lib::enrichment::Table + Send + Sync>>;
}
2 changes: 1 addition & 1 deletion src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Graph {
Node::Transform {
in_ty: transform.inner.input().data_type(),
outputs: transform.inner.outputs(
enrichment::TableRegistry::default(),
vector_lib::enrichment::TableRegistry::default(),
&[(id.into(), schema::Definition::any())],
schema.log_namespace(),
),
Expand Down
6 changes: 3 additions & 3 deletions src/config/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub struct TransformContext {

pub globals: GlobalOptions,

pub enrichment_tables: enrichment::TableRegistry,
pub enrichment_tables: vector_lib::enrichment::TableRegistry,

/// Tracks the schema IDs assigned to schemas exposed by the transform.
///
Expand Down Expand Up @@ -193,7 +193,7 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send +
/// of events flowing through the transform.
fn outputs(
&self,
enrichment_tables: enrichment::TableRegistry,
enrichment_tables: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],

// This only exists for transforms that create logs from non-logs, to know which namespace
Expand Down Expand Up @@ -250,7 +250,7 @@ pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
) -> impl Iterator<Item = OutputId> + '_ {
transform
.outputs(
enrichment::TableRegistry::default(),
vector_lib::enrichment::TableRegistry::default(),
&[(key.clone().into(), schema::Definition::any())],
global_log_namespace,
)
Expand Down
4 changes: 2 additions & 2 deletions src/enrichment_tables/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use std::{
};

use bytes::Bytes;
use enrichment::{Case, Condition, IndexHandle, Table};
use tracing::trace;
use vector_lib::configurable::configurable_component;
use vector_lib::enrichment::{Case, Condition, IndexHandle, Table};
use vector_lib::{conversion::Conversion, TimeZone};
use vrl::value::Value;

Expand Down Expand Up @@ -250,7 +250,7 @@ impl EnrichmentTableConfig for FileConfig {

impl_generate_config_from_default!(FileConfig);

/// A struct that implements [enrichment::Table] to handle loading enrichment data from a CSV file.
/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a CSV file.
#[derive(Clone)]
pub struct File {
config: FileConfig,
Expand Down
4 changes: 2 additions & 2 deletions src/enrichment_tables/geoip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use maxminddb::{
use ordered_float::NotNan;
use vrl::value::Value;

use enrichment::{Case, Condition, IndexHandle, Table};
use vector_lib::configurable::configurable_component;
use vector_lib::enrichment::{Case, Condition, IndexHandle, Table};

use crate::config::{EnrichmentTableConfig, GenerateConfig};

Expand Down Expand Up @@ -101,7 +101,7 @@ impl EnrichmentTableConfig for GeoipConfig {
}

#[derive(Clone)]
/// A struct that implements [enrichment::Table] to handle loading enrichment data from a GeoIP database.
/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a GeoIP database.
pub struct Geoip {
config: GeoipConfig,
dbreader: Arc<maxminddb::Reader<Vec<u8>>>,
Expand Down
2 changes: 1 addition & 1 deletion src/enrichment_tables/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Functionality to handle enrichment tables.
pub use enrichment::{Condition, IndexHandle, Table};
use enum_dispatch::enum_dispatch;
use vector_lib::configurable::{configurable_component, NamedComponent};
pub use vector_lib::enrichment::{Condition, IndexHandle, Table};

use crate::config::{EnrichmentTableConfig, GlobalOptions};

Expand Down
2 changes: 1 addition & 1 deletion src/test_util/mock/transforms/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl TransformConfig for BasicTransformConfig {

fn outputs(
&self,
_: enrichment::TableRegistry,
_: vector_lib::enrichment::TableRegistry,
definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
Expand Down
2 changes: 1 addition & 1 deletion src/test_util/mock/transforms/error_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl TransformConfig for ErrorDefinitionTransformConfig {

fn outputs(
&self,
_: enrichment::TableRegistry,
_: vector_lib::enrichment::TableRegistry,
definitions: &[(OutputId, Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
Expand Down
2 changes: 1 addition & 1 deletion src/test_util/mock/transforms/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl TransformConfig for NoopTransformConfig {

fn outputs(
&self,
_: enrichment::TableRegistry,
_: vector_lib::enrichment::TableRegistry,
definitions: &[(OutputId, Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
Expand Down
15 changes: 9 additions & 6 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ use crate::{
SourceSender,
};

static ENRICHMENT_TABLES: Lazy<enrichment::TableRegistry> =
Lazy::new(enrichment::TableRegistry::default);
static ENRICHMENT_TABLES: Lazy<vector_lib::enrichment::TableRegistry> =
Lazy::new(vector_lib::enrichment::TableRegistry::default);

pub(crate) static SOURCE_SENDER_BUFFER_SIZE: Lazy<usize> =
Lazy::new(|| *TRANSFORM_CONCURRENCY_LIMIT * CHUNK_SIZE);
Expand Down Expand Up @@ -149,7 +149,7 @@ impl<'a> Builder<'a> {

/// Loads, or reloads the enrichment tables.
/// The tables are stored in the `ENRICHMENT_TABLES` global variable.
async fn load_enrichment_tables(&mut self) -> &'static enrichment::TableRegistry {
async fn load_enrichment_tables(&mut self) -> &'static vector_lib::enrichment::TableRegistry {
let mut enrichment_tables = HashMap::new();

// Build enrichment tables
Expand Down Expand Up @@ -389,7 +389,10 @@ impl<'a> Builder<'a> {
source_tasks
}

async fn build_transforms(&mut self, enrichment_tables: &enrichment::TableRegistry) {
async fn build_transforms(
&mut self,
enrichment_tables: &vector_lib::enrichment::TableRegistry,
) {
let mut definition_cache = HashMap::default();

for (key, transform) in self
Expand Down Expand Up @@ -490,7 +493,7 @@ impl<'a> Builder<'a> {
}
}

async fn build_sinks(&mut self, enrichment_tables: &enrichment::TableRegistry) {
async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) {
for (key, sink) in self
.config
.sinks()
Expand Down Expand Up @@ -716,7 +719,7 @@ struct TransformNode {
impl TransformNode {
pub fn from_parts(
key: ComponentKey,
enrichment_tables: enrichment::TableRegistry,
enrichment_tables: vector_lib::enrichment::TableRegistry,
transform: &TransformOuter<OutputId>,
schema_definition: &[(OutputId, Definition)],
global_log_namespace: LogNamespace,
Expand Down
18 changes: 9 additions & 9 deletions src/topology/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Cache = HashMap<(bool, Vec<OutputId>), Vec<(OutputId, Definition)>>;
pub fn possible_definitions(
inputs: &[OutputId],
config: &dyn ComponentContainer,
enrichment_tables: enrichment::TableRegistry,
enrichment_tables: vector_lib::enrichment::TableRegistry,
cache: &mut Cache,
) -> Result<Vec<(OutputId, Definition)>, Error> {
if inputs.is_empty() {
Expand Down Expand Up @@ -109,7 +109,7 @@ pub fn possible_definitions(
/// 5` being expanded into two individual routes (So1 -> T3 -> T5 -> Si1 AND So1 -> T4 -> T5 ->
/// Si1).
pub(super) fn expanded_definitions(
enrichment_tables: enrichment::TableRegistry,
enrichment_tables: vector_lib::enrichment::TableRegistry,
inputs: &[OutputId],
config: &dyn ComponentContainer,
cache: &mut Cache,
Expand Down Expand Up @@ -210,7 +210,7 @@ pub(super) fn expanded_definitions(
pub(crate) fn input_definitions(
inputs: &[OutputId],
config: &Config,
enrichment_tables: enrichment::TableRegistry,
enrichment_tables: vector_lib::enrichment::TableRegistry,
cache: &mut Cache,
) -> Result<Vec<(OutputId, Definition)>, Error> {
if inputs.is_empty() {
Expand Down Expand Up @@ -301,7 +301,7 @@ pub(super) fn validate_sink_expectations(
key: &ComponentKey,
sink: &SinkOuter<OutputId>,
config: &topology::Config,
enrichment_tables: enrichment::TableRegistry,
enrichment_tables: vector_lib::enrichment::TableRegistry,
) -> Result<(), Vec<String>> {
let mut errors = vec![];

Expand Down Expand Up @@ -352,7 +352,7 @@ pub trait ComponentContainer {
fn transform_outputs(
&self,
key: &ComponentKey,
enrichment_tables: enrichment::TableRegistry,
enrichment_tables: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, Definition)],
) -> Option<Vec<TransformOutput>>;

Expand All @@ -365,7 +365,7 @@ pub trait ComponentContainer {
&self,
key: &ComponentKey,
port: &Option<String>,
enrichment_tables: enrichment::TableRegistry,
enrichment_tables: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, Definition)],
) -> Result<Option<TransformOutput>, ()> {
if let Some(outputs) = self.transform_outputs(key, enrichment_tables, input_definitions) {
Expand Down Expand Up @@ -424,7 +424,7 @@ impl ComponentContainer for Config {
fn transform_outputs(
&self,
key: &ComponentKey,
enrichment_tables: enrichment::TableRegistry,
enrichment_tables: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, Definition)],
) -> Option<Vec<TransformOutput>> {
self.transform(key).map(|source| {
Expand Down Expand Up @@ -474,7 +474,7 @@ mod tests {
fn transform_outputs(
&self,
key: &ComponentKey,
_: enrichment::TableRegistry,
_: vector_lib::enrichment::TableRegistry,
_: &[(OutputId, Definition)],
) -> Option<Vec<TransformOutput>> {
self.transforms.get(key.id()).cloned().map(|v| v.1)
Expand Down Expand Up @@ -818,7 +818,7 @@ mod tests {
.collect::<Vec<_>>();

let got = expanded_definitions(
enrichment::TableRegistry::default(),
vector_lib::enrichment::TableRegistry::default(),
&inputs,
&case,
&mut HashMap::default(),
Expand Down
2 changes: 1 addition & 1 deletion src/transforms/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl TransformConfig for AggregateConfig {

fn outputs(
&self,
_: enrichment::TableRegistry,
_: vector_lib::enrichment::TableRegistry,
_: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
Expand Down
4 changes: 2 additions & 2 deletions src/transforms/aws_ec2_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl TransformConfig for Ec2Metadata {

fn outputs(
&self,
_: enrichment::TableRegistry,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
Expand Down Expand Up @@ -719,8 +719,8 @@ mod test {
use crate::config::schema::Definition;
use crate::config::{LogNamespace, OutputId, TransformConfig};
use crate::transforms::aws_ec2_metadata::Ec2Metadata;
use enrichment::TableRegistry;
use lookup::OwnedTargetPath;
use vector_lib::enrichment::TableRegistry;
use vrl::owned_value_path;
use vrl::value::Kind;

Expand Down
2 changes: 1 addition & 1 deletion src/transforms/dedupe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl TransformConfig for DedupeConfig {

fn outputs(
&self,
_: enrichment::TableRegistry,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
Expand Down
Loading

0 comments on commit aadde5f

Please sign in to comment.