Skip to content

Commit

Permalink
chore(dedupe transform): Test compliance to instrumentation spec in `…
Browse files Browse the repository at this point in the history
…dedupe` transform (#14388)

Signed-off-by: Pablo Sichert <mail@pablosichert.com>
  • Loading branch information
pablosichert authored Sep 13, 2022
1 parent fb30713 commit 2189555
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 187 deletions.
1 change: 1 addition & 0 deletions src/config/unit_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use vector_core::config::LogNamespace;

pub use self::unit_test_components::{
UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
};
use super::{compiler::expand_globs, graph::Graph, OutputId, TransformConfig};
use crate::{
Expand Down
100 changes: 98 additions & 2 deletions src/config/unit_test/unit_test_components.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::sync::Arc;

use futures::{stream, Sink, Stream};
use futures_util::{future, stream::BoxStream, FutureExt, StreamExt};
use tokio::sync::{oneshot, Mutex};
use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vector_core::{
config::{DataType, Input, Output},
config::{DataType, Input, LogNamespace, Output},
event::Event,
sink::{StreamSink, VectorSink},
};
Expand Down Expand Up @@ -51,6 +51,59 @@ impl SourceConfig for UnitTestSourceConfig {
}
}

/// Configuration for the `unit_test_stream` source.
#[configurable_component(source("unit_test_stream"))]
#[derive(Clone)]
pub struct UnitTestStreamSourceConfig {
#[serde(skip)]
stream: Arc<Mutex<Option<stream::BoxStream<'static, Event>>>>,
}

impl_generate_config_from_default!(UnitTestStreamSourceConfig);

impl UnitTestStreamSourceConfig {
pub fn new(stream: impl Stream<Item = Event> + Send + 'static) -> Self {
Self {
stream: Arc::new(Mutex::new(Some(stream.boxed()))),
}
}
}

impl Default for UnitTestStreamSourceConfig {
fn default() -> Self {
Self::new(stream::empty().boxed())
}
}

impl std::fmt::Debug for UnitTestStreamSourceConfig {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("UnitTestStreamSourceConfig")
.finish()
}
}

#[async_trait::async_trait]
impl SourceConfig for UnitTestStreamSourceConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
let stream = self.stream.lock().await.take().unwrap();
Ok(Box::pin(async move {
let mut out = cx.out;
let _shutdown = cx.shutdown;
out.send_event_stream(stream).await.map_err(|_| ())?;
Ok(())
}))
}

fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<Output> {
vec![Output::default(DataType::all())]
}

fn can_acknowledge(&self) -> bool {
false
}
}

#[derive(Clone)]
pub enum UnitTestSinkCheck {
/// Check all events that are received against the list of conditions.
Expand Down Expand Up @@ -212,6 +265,49 @@ impl StreamSink<Event> for UnitTestSink {
}
}

/// Configuration for the `unit_test_stream` sink.
#[configurable_component(sink("unit_test_stream"))]
#[derive(Clone, Default)]
pub struct UnitTestStreamSinkConfig {
/// Sink that receives the processed events.
#[serde(skip)]
sink: Arc<Mutex<Option<Box<dyn Sink<Event, Error = ()> + Send + Unpin>>>>,
}

impl_generate_config_from_default!(UnitTestStreamSinkConfig);

impl UnitTestStreamSinkConfig {
pub fn new(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
Self {
sink: Arc::new(Mutex::new(Some(Box::new(sink)))),
}
}
}

impl std::fmt::Debug for UnitTestStreamSinkConfig {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.debug_struct("UnitTestStreamSinkConfig").finish()
}
}

#[async_trait::async_trait]
impl SinkConfig for UnitTestStreamSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sink = self.sink.lock().await.take().unwrap();
let healthcheck = future::ok(()).boxed();

Ok((VectorSink::from_event_sink(sink), healthcheck))
}

fn input(&self) -> Input {
Input::all()
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&AcknowledgementsConfig::DEFAULT
}
}

fn events_to_string(events: &[Event]) -> String {
events
.iter()
Expand Down
7 changes: 6 additions & 1 deletion src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ use vector_config::{configurable_component, NamedComponent};
pub use vector_core::{config::Input, sink::VectorSink};

use crate::config::{
unit_test::UnitTestSinkConfig, AcknowledgementsConfig, Resource, SinkConfig, SinkContext,
unit_test::{UnitTestSinkConfig, UnitTestStreamSinkConfig},
AcknowledgementsConfig, Resource, SinkConfig, SinkContext,
};

pub type Healthcheck = BoxFuture<'static, crate::Result<()>>;
Expand Down Expand Up @@ -346,6 +347,9 @@ pub enum Sinks {
/// Unit test.
UnitTest(#[configurable(derived)] UnitTestSinkConfig),

/// Unit test stream.
UnitTestStream(#[configurable(derived)] UnitTestStreamSinkConfig),

/// Vector.
#[cfg(feature = "sinks-vector")]
Vector(#[configurable(derived)] vector::VectorConfig),
Expand Down Expand Up @@ -465,6 +469,7 @@ impl NamedComponent for Sinks {
#[cfg(test)]
Self::TestPanic(config) => config.get_component_name(),
Self::UnitTest(config) => config.get_component_name(),
Self::UnitTestStream(config) => config.get_component_name(),
#[cfg(feature = "sinks-vector")]
Self::Vector(config) => config.get_component_name(),
#[cfg(feature = "sinks-websocket")]
Expand Down
9 changes: 8 additions & 1 deletion src/sources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ use vector_config::{configurable_component, NamedComponent};
use vector_core::config::{LogNamespace, Output};
pub use vector_core::source::Source;

use crate::config::{unit_test::UnitTestSourceConfig, Resource, SourceConfig, SourceContext};
use crate::config::{
unit_test::{UnitTestSourceConfig, UnitTestStreamSourceConfig},
Resource, SourceConfig, SourceContext,
};

/// Common build errors
#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -280,6 +283,9 @@ pub enum Sources {
/// Unit test.
UnitTest(#[configurable(derived)] UnitTestSourceConfig),

/// Unit test stream.
UnitTestStream(#[configurable(derived)] UnitTestStreamSourceConfig),

/// Vector.
#[cfg(feature = "sources-vector")]
Vector(#[configurable(derived)] vector::VectorConfig),
Expand Down Expand Up @@ -378,6 +384,7 @@ impl NamedComponent for Sources {
#[cfg(feature = "sources-syslog")]
Self::Syslog(config) => config.get_component_name(),
Self::UnitTest(config) => config.get_component_name(),
Self::UnitTestStream(config) => config.get_component_name(),
#[cfg(feature = "sources-vector")]
Self::Vector(config) => config.get_component_name(),
}
Expand Down
Loading

0 comments on commit 2189555

Please sign in to comment.